[GitHub] [carbondata] niuge01 opened a new pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

classic Classic list List threaded Threaded
67 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
jackylk commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#discussion_r375026570
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteStageCommand.scala
 ##########
 @@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.io.InputStreamReader
+import java.util
+import java.util.Collections
+import java.util.concurrent.Executors
+
+import scala.collection.JavaConverters._
+
+import com.google.gson.Gson
+import org.apache.hadoop.conf.Configuration
+import org.apache.log4j.Logger
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.statusmanager.StageInput
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+/**
+ * Delete carbon data files of table stages.
+ *
+ * @param databaseNameOp database name
+ * @param tableName      table name
+ */
+case class CarbonDeleteStageCommand(
 
 Review comment:
   ```suggestion
   case class CarbonDeleteStageFilsCommand(
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#discussion_r375029238
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteStageCommand.scala
 ##########
 @@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.io.InputStreamReader
+import java.util
+import java.util.Collections
+import java.util.concurrent.Executors
+
+import scala.collection.JavaConverters._
+
+import com.google.gson.Gson
+import org.apache.hadoop.conf.Configuration
+import org.apache.log4j.Logger
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.statusmanager.StageInput
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+/**
+ * Delete carbon data files of table stages.
+ *
+ * @param databaseNameOp database name
+ * @param tableName      table name
+ */
+case class CarbonDeleteStageCommand(
+    databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String]
+) extends DataCommand {
+
+  @transient var LOGGER: Logger = _
+
+  override def processData(spark: SparkSession): Seq[Row] = {
+    LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    Checker.validateTableExists(databaseNameOp, tableName, spark)
+    val table = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(spark)
+    val configuration = spark.sessionState.newHadoopConf()
+    FileFactory.getConfiguration.addResource(configuration)
+    setAuditTable(table)
+    if (!table.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
+    }
+    if (table.isChildTableForMV) {
+      throw new MalformedCarbonCommandException("Unsupported operation on MV table")
+    }
+    val tablePath = table.getTablePath
+    val startTime = System.currentTimeMillis()
+    val stageDataFileLocation = options.get("data_file_location")
+    if (stageDataFileLocation.isEmpty) {
+      throw new MalformedCarbonCommandException("Option [data_file_location] is not specified.")
+    }
+    val stageDataFileActiveTime = try {
+      Integer.valueOf(options.getOrElse("data_file_retain_time_second", "0")) * 1000
+    } catch {
+      case _: NumberFormatException =>
+        throw new MalformedCarbonCommandException(
+          "Option [data_file_retain_time_second] is not a number.")
+    }
+    if (stageDataFileActiveTime < 0) {
+      throw new MalformedCarbonCommandException(
+        "Option [data_file_retain_time_second] is negative.")
+    }
+    val stageDataFilesReferenced =
+      listStageDataFilesReferenced(listStageFiles(tablePath, configuration), configuration)
+    val stageDataFiles = listStageDataFiles(stageDataFileLocation.get, configuration)
+    stageDataFiles.map(
+      stageDataFile => {
+        // Which file will be deleted:
+        // 1. Not referenced by any stage file;
+        // 2. Has passed retain time.
+        if (stageDataFilesReferenced.contains(stageDataFile.getCanonicalPath)) {
+          null
+        } else if ((startTime - stageDataFile.getLastModifiedTime) < stageDataFileActiveTime) {
+          null
+        } else if (!stageDataFile.delete()) {
+          null
+        } else {
+          // Stage data file deleted.
+          stageDataFile.getCanonicalPath
+        }
+      }
+    ).filter(
+      stageDataFile => stageDataFile != null
+    ).map(
+      stageDataFile => Row.fromSeq(Seq(stageDataFile))
+    )
+    Seq.empty
+  }
+
+  private def listStageFiles(tablePath: String, configuration: Configuration): Seq[CarbonFile] = {
 
 Review comment:
   Is this for listing stage metadata files? It is not easy to tell the difference from `listStageDataFiles`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#discussion_r375029360
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteStageCommand.scala
 ##########
 @@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.io.InputStreamReader
+import java.util
+import java.util.Collections
+import java.util.concurrent.Executors
+
+import scala.collection.JavaConverters._
+
+import com.google.gson.Gson
+import org.apache.hadoop.conf.Configuration
+import org.apache.log4j.Logger
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.statusmanager.StageInput
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+/**
+ * Delete carbon data files of table stages.
+ *
+ * @param databaseNameOp database name
+ * @param tableName      table name
+ */
+case class CarbonDeleteStageCommand(
+    databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String]
+) extends DataCommand {
+
+  @transient var LOGGER: Logger = _
+
+  override def processData(spark: SparkSession): Seq[Row] = {
+    LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    Checker.validateTableExists(databaseNameOp, tableName, spark)
+    val table = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(spark)
+    val configuration = spark.sessionState.newHadoopConf()
+    FileFactory.getConfiguration.addResource(configuration)
+    setAuditTable(table)
+    if (!table.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
+    }
+    if (table.isChildTableForMV) {
+      throw new MalformedCarbonCommandException("Unsupported operation on MV table")
+    }
+    val tablePath = table.getTablePath
+    val startTime = System.currentTimeMillis()
+    val stageDataFileLocation = options.get("data_file_location")
+    if (stageDataFileLocation.isEmpty) {
+      throw new MalformedCarbonCommandException("Option [data_file_location] is not specified.")
+    }
+    val stageDataFileActiveTime = try {
+      Integer.valueOf(options.getOrElse("data_file_retain_time_second", "0")) * 1000
+    } catch {
+      case _: NumberFormatException =>
+        throw new MalformedCarbonCommandException(
+          "Option [data_file_retain_time_second] is not a number.")
+    }
+    if (stageDataFileActiveTime < 0) {
+      throw new MalformedCarbonCommandException(
+        "Option [data_file_retain_time_second] is negative.")
+    }
+    val stageDataFilesReferenced =
+      listStageDataFilesReferenced(listStageFiles(tablePath, configuration), configuration)
+    val stageDataFiles = listStageDataFiles(stageDataFileLocation.get, configuration)
+    stageDataFiles.map(
+      stageDataFile => {
+        // Which file will be deleted:
+        // 1. Not referenced by any stage file;
+        // 2. Has passed retain time.
+        if (stageDataFilesReferenced.contains(stageDataFile.getCanonicalPath)) {
+          null
+        } else if ((startTime - stageDataFile.getLastModifiedTime) < stageDataFileActiveTime) {
+          null
+        } else if (!stageDataFile.delete()) {
+          null
+        } else {
+          // Stage data file deleted.
+          stageDataFile.getCanonicalPath
+        }
+      }
+    ).filter(
+      stageDataFile => stageDataFile != null
+    ).map(
+      stageDataFile => Row.fromSeq(Seq(stageDataFile))
+    )
+    Seq.empty
+  }
+
+  private def listStageFiles(tablePath: String, configuration: Configuration): Seq[CarbonFile] = {
+    val stagePath = CarbonTablePath.getStageDir(tablePath)
+    val stageDirectory = FileFactory.getCarbonFile(stagePath, configuration)
+    if (stageDirectory.exists()) {
+      stageDirectory.listFiles().filter { file =>
+        !file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
+      }
+    } else {
+      Seq.empty
+    }
+  }
+
+  private def listStageDataFiles(location: String,
 
 Review comment:
   move `location` to next line

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#discussion_r375029418
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteStageCommand.scala
 ##########
 @@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.io.InputStreamReader
+import java.util
+import java.util.Collections
+import java.util.concurrent.Executors
+
+import scala.collection.JavaConverters._
+
+import com.google.gson.Gson
+import org.apache.hadoop.conf.Configuration
+import org.apache.log4j.Logger
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.statusmanager.StageInput
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+/**
+ * Delete carbon data files of table stages.
+ *
+ * @param databaseNameOp database name
+ * @param tableName      table name
+ */
+case class CarbonDeleteStageCommand(
+    databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String]
+) extends DataCommand {
+
+  @transient var LOGGER: Logger = _
+
+  override def processData(spark: SparkSession): Seq[Row] = {
+    LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    Checker.validateTableExists(databaseNameOp, tableName, spark)
+    val table = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(spark)
+    val configuration = spark.sessionState.newHadoopConf()
+    FileFactory.getConfiguration.addResource(configuration)
+    setAuditTable(table)
+    if (!table.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
+    }
+    if (table.isChildTableForMV) {
+      throw new MalformedCarbonCommandException("Unsupported operation on MV table")
+    }
+    val tablePath = table.getTablePath
+    val startTime = System.currentTimeMillis()
+    val stageDataFileLocation = options.get("data_file_location")
+    if (stageDataFileLocation.isEmpty) {
+      throw new MalformedCarbonCommandException("Option [data_file_location] is not specified.")
+    }
+    val stageDataFileActiveTime = try {
+      Integer.valueOf(options.getOrElse("data_file_retain_time_second", "0")) * 1000
+    } catch {
+      case _: NumberFormatException =>
+        throw new MalformedCarbonCommandException(
+          "Option [data_file_retain_time_second] is not a number.")
+    }
+    if (stageDataFileActiveTime < 0) {
+      throw new MalformedCarbonCommandException(
+        "Option [data_file_retain_time_second] is negative.")
+    }
+    val stageDataFilesReferenced =
+      listStageDataFilesReferenced(listStageFiles(tablePath, configuration), configuration)
+    val stageDataFiles = listStageDataFiles(stageDataFileLocation.get, configuration)
+    stageDataFiles.map(
+      stageDataFile => {
+        // Which file will be deleted:
+        // 1. Not referenced by any stage file;
+        // 2. Has passed retain time.
+        if (stageDataFilesReferenced.contains(stageDataFile.getCanonicalPath)) {
+          null
+        } else if ((startTime - stageDataFile.getLastModifiedTime) < stageDataFileActiveTime) {
+          null
+        } else if (!stageDataFile.delete()) {
+          null
+        } else {
+          // Stage data file deleted.
+          stageDataFile.getCanonicalPath
+        }
+      }
+    ).filter(
+      stageDataFile => stageDataFile != null
+    ).map(
+      stageDataFile => Row.fromSeq(Seq(stageDataFile))
+    )
+    Seq.empty
+  }
+
+  private def listStageFiles(tablePath: String, configuration: Configuration): Seq[CarbonFile] = {
+    val stagePath = CarbonTablePath.getStageDir(tablePath)
+    val stageDirectory = FileFactory.getCarbonFile(stagePath, configuration)
+    if (stageDirectory.exists()) {
+      stageDirectory.listFiles().filter { file =>
+        !file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
+      }
+    } else {
+      Seq.empty
+    }
+  }
+
+  private def listStageDataFiles(location: String,
+      configuration: Configuration): Seq[CarbonFile] = {
+    val stageDataFileLocation = FileFactory.getCarbonFile(location, configuration)
+    if (!stageDataFileLocation.exists()) {
+      LOGGER.warn("Stage data file location is not exists. " + location)
+      Seq.empty
+    } else {
+      stageDataFileLocation.listFiles(true).asScala
+    }
+  }
+
+  private def listStageDataFilesReferenced(stageFiles: Seq[CarbonFile],
 
 Review comment:
   move `stageFiles: Seq[CarbonFile]` to next line

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on issue #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
niuge01 commented on issue #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#issuecomment-582265267
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
niuge01 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#discussion_r375082395
 
 

 ##########
 File path: docs/dml-of-carbondata.md
 ##########
 @@ -446,6 +446,46 @@ CarbonData DML statements are documented here,which includes:
   ```
   DELETE FROM carbontable WHERE column1 IN (SELECT column11 FROM sourceTable2 WHERE column1 = 'USA')
   ```
+    
+### DELETE STAGE
+
+  This command allows us to delete data files which referenced by disabled table stages.
 
 Review comment:
   Accept suggestion, and updated.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
niuge01 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#discussion_r375082453
 
 

 ##########
 File path: docs/dml-of-carbondata.md
 ##########
 @@ -446,6 +446,46 @@ CarbonData DML statements are documented here,which includes:
   ```
   DELETE FROM carbontable WHERE column1 IN (SELECT column11 FROM sourceTable2 WHERE column1 = 'USA')
   ```
+    
+### DELETE STAGE
+
+  This command allows us to delete data files which referenced by disabled table stages.
+  ```
+  DELETE FROM TABLE [db_name.]table_name STAGE OPTIONS(property_name=property_value, ...)
+  ```  
+  **Supported Properties:**
+
+| Property                                                     | Description                                                  |
+| ------------------------------------------------------------ | ------------------------------------------------------------ |
+| [data_file_location](#data_file_location)                    | The data files location                                      |
+| [data_file_retain_time_second](#data_file_retain_time_second)| Data file retain time in second                              |
 
 Review comment:
   Accept suggestion, and updated.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
niuge01 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#discussion_r375082944
 
 

 ##########
 File path: docs/dml-of-carbondata.md
 ##########
 @@ -446,6 +446,46 @@ CarbonData DML statements are documented here,which includes:
   ```
   DELETE FROM carbontable WHERE column1 IN (SELECT column11 FROM sourceTable2 WHERE column1 = 'USA')
   ```
+    
+### DELETE STAGE
+
+  This command allows us to delete data files which referenced by disabled table stages.
+  ```
+  DELETE FROM TABLE [db_name.]table_name STAGE OPTIONS(property_name=property_value, ...)
+  ```  
+  **Supported Properties:**
+
+| Property                                                     | Description                                                  |
+| ------------------------------------------------------------ | ------------------------------------------------------------ |
+| [data_file_location](#data_file_location)                    | The data files location                                      |
 
 Review comment:
   Accept suggestion, and updated.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
niuge01 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#discussion_r375083067
 
 

 ##########
 File path: docs/dml-of-carbondata.md
 ##########
 @@ -446,6 +446,46 @@ CarbonData DML statements are documented here,which includes:
   ```
   DELETE FROM carbontable WHERE column1 IN (SELECT column11 FROM sourceTable2 WHERE column1 = 'USA')
   ```
+    
+### DELETE STAGE
+
+  This command allows us to delete data files which referenced by disabled table stages.
+  ```
+  DELETE FROM TABLE [db_name.]table_name STAGE OPTIONS(property_name=property_value, ...)
+  ```  
+  **Supported Properties:**
+
+| Property                                                     | Description                                                  |
+| ------------------------------------------------------------ | ------------------------------------------------------------ |
+| [data_file_location](#data_file_location)                    | The data files location                                      |
+| [data_file_retain_time_second](#data_file_retain_time_second)| Data file retain time in second                              |
+
+-
+  You can use the following options to delete data:
+
+  - ##### data_file_location:
+    The data files location, the command will scan the location, and delete files which can be deleted.
 
 Review comment:
   Accept suggestion, and updated.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
niuge01 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#discussion_r375083195
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 ##########
 @@ -218,6 +218,12 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
         }
     }
 
+  protected lazy val options: Parser[(String, String)] =
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
niuge01 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#discussion_r375083306
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteStageCommand.scala
 ##########
 @@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.io.InputStreamReader
+import java.util
+import java.util.Collections
+import java.util.concurrent.Executors
+
+import scala.collection.JavaConverters._
+
+import com.google.gson.Gson
+import org.apache.hadoop.conf.Configuration
+import org.apache.log4j.Logger
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.statusmanager.StageInput
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+/**
+ * Delete carbon data files of table stages.
+ *
+ * @param databaseNameOp database name
+ * @param tableName      table name
+ */
+case class CarbonDeleteStageCommand(
 
 Review comment:
   Accept suggestion, and updated.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
niuge01 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#discussion_r375083795
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteStageCommand.scala
 ##########
 @@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.io.InputStreamReader
+import java.util
+import java.util.Collections
+import java.util.concurrent.Executors
+
+import scala.collection.JavaConverters._
+
+import com.google.gson.Gson
+import org.apache.hadoop.conf.Configuration
+import org.apache.log4j.Logger
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.statusmanager.StageInput
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+/**
+ * Delete carbon data files of table stages.
+ *
+ * @param databaseNameOp database name
+ * @param tableName      table name
+ */
+case class CarbonDeleteStageCommand(
+    databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String]
+) extends DataCommand {
+
+  @transient var LOGGER: Logger = _
+
+  override def processData(spark: SparkSession): Seq[Row] = {
+    LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    Checker.validateTableExists(databaseNameOp, tableName, spark)
+    val table = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(spark)
+    val configuration = spark.sessionState.newHadoopConf()
+    FileFactory.getConfiguration.addResource(configuration)
+    setAuditTable(table)
+    if (!table.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
+    }
+    if (table.isChildTableForMV) {
+      throw new MalformedCarbonCommandException("Unsupported operation on MV table")
+    }
+    val tablePath = table.getTablePath
+    val startTime = System.currentTimeMillis()
+    val stageDataFileLocation = options.get("data_file_location")
+    if (stageDataFileLocation.isEmpty) {
+      throw new MalformedCarbonCommandException("Option [data_file_location] is not specified.")
+    }
+    val stageDataFileActiveTime = try {
+      Integer.valueOf(options.getOrElse("data_file_retain_time_second", "0")) * 1000
+    } catch {
+      case _: NumberFormatException =>
+        throw new MalformedCarbonCommandException(
+          "Option [data_file_retain_time_second] is not a number.")
+    }
+    if (stageDataFileActiveTime < 0) {
+      throw new MalformedCarbonCommandException(
+        "Option [data_file_retain_time_second] is negative.")
+    }
+    val stageDataFilesReferenced =
+      listStageDataFilesReferenced(listStageFiles(tablePath, configuration), configuration)
+    val stageDataFiles = listStageDataFiles(stageDataFileLocation.get, configuration)
+    stageDataFiles.map(
+      stageDataFile => {
+        // Which file will be deleted:
+        // 1. Not referenced by any stage file;
+        // 2. Has passed retain time.
+        if (stageDataFilesReferenced.contains(stageDataFile.getCanonicalPath)) {
+          null
+        } else if ((startTime - stageDataFile.getLastModifiedTime) < stageDataFileActiveTime) {
+          null
+        } else if (!stageDataFile.delete()) {
+          null
+        } else {
+          // Stage data file deleted.
+          stageDataFile.getCanonicalPath
+        }
+      }
+    ).filter(
+      stageDataFile => stageDataFile != null
+    ).map(
+      stageDataFile => Row.fromSeq(Seq(stageDataFile))
+    )
+    Seq.empty
+  }
+
+  private def listStageFiles(tablePath: String, configuration: Configuration): Seq[CarbonFile] = {
 
 Review comment:
   The method name has changed to listStageMetadataFiles

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
niuge01 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#discussion_r375083905
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteStageCommand.scala
 ##########
 @@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.io.InputStreamReader
+import java.util
+import java.util.Collections
+import java.util.concurrent.Executors
+
+import scala.collection.JavaConverters._
+
+import com.google.gson.Gson
+import org.apache.hadoop.conf.Configuration
+import org.apache.log4j.Logger
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.statusmanager.StageInput
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+/**
+ * Delete carbon data files of table stages.
+ *
+ * @param databaseNameOp database name
+ * @param tableName      table name
+ */
+case class CarbonDeleteStageCommand(
+    databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String]
+) extends DataCommand {
+
+  @transient var LOGGER: Logger = _
+
+  override def processData(spark: SparkSession): Seq[Row] = {
+    LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    Checker.validateTableExists(databaseNameOp, tableName, spark)
+    val table = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(spark)
+    val configuration = spark.sessionState.newHadoopConf()
+    FileFactory.getConfiguration.addResource(configuration)
+    setAuditTable(table)
+    if (!table.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
+    }
+    if (table.isChildTableForMV) {
+      throw new MalformedCarbonCommandException("Unsupported operation on MV table")
+    }
+    val tablePath = table.getTablePath
+    val startTime = System.currentTimeMillis()
+    val stageDataFileLocation = options.get("data_file_location")
+    if (stageDataFileLocation.isEmpty) {
+      throw new MalformedCarbonCommandException("Option [data_file_location] is not specified.")
+    }
+    val stageDataFileActiveTime = try {
+      Integer.valueOf(options.getOrElse("data_file_retain_time_second", "0")) * 1000
+    } catch {
+      case _: NumberFormatException =>
+        throw new MalformedCarbonCommandException(
+          "Option [data_file_retain_time_second] is not a number.")
+    }
+    if (stageDataFileActiveTime < 0) {
+      throw new MalformedCarbonCommandException(
+        "Option [data_file_retain_time_second] is negative.")
+    }
+    val stageDataFilesReferenced =
+      listStageDataFilesReferenced(listStageFiles(tablePath, configuration), configuration)
+    val stageDataFiles = listStageDataFiles(stageDataFileLocation.get, configuration)
+    stageDataFiles.map(
+      stageDataFile => {
+        // Which file will be deleted:
+        // 1. Not referenced by any stage file;
+        // 2. Has passed retain time.
+        if (stageDataFilesReferenced.contains(stageDataFile.getCanonicalPath)) {
+          null
+        } else if ((startTime - stageDataFile.getLastModifiedTime) < stageDataFileActiveTime) {
+          null
+        } else if (!stageDataFile.delete()) {
+          null
+        } else {
+          // Stage data file deleted.
+          stageDataFile.getCanonicalPath
+        }
+      }
+    ).filter(
+      stageDataFile => stageDataFile != null
+    ).map(
+      stageDataFile => Row.fromSeq(Seq(stageDataFile))
+    )
+    Seq.empty
+  }
+
+  private def listStageFiles(tablePath: String, configuration: Configuration): Seq[CarbonFile] = {
+    val stagePath = CarbonTablePath.getStageDir(tablePath)
+    val stageDirectory = FileFactory.getCarbonFile(stagePath, configuration)
+    if (stageDirectory.exists()) {
+      stageDirectory.listFiles().filter { file =>
+        !file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
+      }
+    } else {
+      Seq.empty
+    }
+  }
+
+  private def listStageDataFiles(location: String,
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
niuge01 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#discussion_r375083961
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteStageCommand.scala
 ##########
 @@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.io.InputStreamReader
+import java.util
+import java.util.Collections
+import java.util.concurrent.Executors
+
+import scala.collection.JavaConverters._
+
+import com.google.gson.Gson
+import org.apache.hadoop.conf.Configuration
+import org.apache.log4j.Logger
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.statusmanager.StageInput
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+/**
+ * Delete carbon data files of table stages.
+ *
+ * @param databaseNameOp database name
+ * @param tableName      table name
+ */
+case class CarbonDeleteStageCommand(
+    databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String]
+) extends DataCommand {
+
+  @transient var LOGGER: Logger = _
+
+  override def processData(spark: SparkSession): Seq[Row] = {
+    LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    Checker.validateTableExists(databaseNameOp, tableName, spark)
+    val table = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(spark)
+    val configuration = spark.sessionState.newHadoopConf()
+    FileFactory.getConfiguration.addResource(configuration)
+    setAuditTable(table)
+    if (!table.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
+    }
+    if (table.isChildTableForMV) {
+      throw new MalformedCarbonCommandException("Unsupported operation on MV table")
+    }
+    val tablePath = table.getTablePath
+    val startTime = System.currentTimeMillis()
+    val stageDataFileLocation = options.get("data_file_location")
+    if (stageDataFileLocation.isEmpty) {
+      throw new MalformedCarbonCommandException("Option [data_file_location] is not specified.")
+    }
+    val stageDataFileActiveTime = try {
+      Integer.valueOf(options.getOrElse("data_file_retain_time_second", "0")) * 1000
+    } catch {
+      case _: NumberFormatException =>
+        throw new MalformedCarbonCommandException(
+          "Option [data_file_retain_time_second] is not a number.")
+    }
+    if (stageDataFileActiveTime < 0) {
+      throw new MalformedCarbonCommandException(
+        "Option [data_file_retain_time_second] is negative.")
+    }
+    val stageDataFilesReferenced =
+      listStageDataFilesReferenced(listStageFiles(tablePath, configuration), configuration)
+    val stageDataFiles = listStageDataFiles(stageDataFileLocation.get, configuration)
+    stageDataFiles.map(
+      stageDataFile => {
+        // Which file will be deleted:
+        // 1. Not referenced by any stage file;
+        // 2. Has passed retain time.
+        if (stageDataFilesReferenced.contains(stageDataFile.getCanonicalPath)) {
+          null
+        } else if ((startTime - stageDataFile.getLastModifiedTime) < stageDataFileActiveTime) {
+          null
+        } else if (!stageDataFile.delete()) {
+          null
+        } else {
+          // Stage data file deleted.
+          stageDataFile.getCanonicalPath
+        }
+      }
+    ).filter(
+      stageDataFile => stageDataFile != null
+    ).map(
+      stageDataFile => Row.fromSeq(Seq(stageDataFile))
+    )
+    Seq.empty
+  }
+
+  private def listStageFiles(tablePath: String, configuration: Configuration): Seq[CarbonFile] = {
+    val stagePath = CarbonTablePath.getStageDir(tablePath)
+    val stageDirectory = FileFactory.getCarbonFile(stagePath, configuration)
+    if (stageDirectory.exists()) {
+      stageDirectory.listFiles().filter { file =>
+        !file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
+      }
+    } else {
+      Seq.empty
+    }
+  }
+
+  private def listStageDataFiles(location: String,
+      configuration: Configuration): Seq[CarbonFile] = {
+    val stageDataFileLocation = FileFactory.getCarbonFile(location, configuration)
+    if (!stageDataFileLocation.exists()) {
+      LOGGER.warn("Stage data file location is not exists. " + location)
+      Seq.empty
+    } else {
+      stageDataFileLocation.listFiles(true).asScala
+    }
+  }
+
+  private def listStageDataFilesReferenced(stageFiles: Seq[CarbonFile],
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#issuecomment-582270531
 
 
   Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/153/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#issuecomment-582287845
 
 
   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1856/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on issue #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
jackylk commented on issue #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#issuecomment-582464917
 
 
   LGTM

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on issue #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
QiangCai commented on issue #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#issuecomment-582724402
 
 
   Better to avoid to specify the path if the path can be any path.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] kunal642 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
kunal642 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#discussion_r375649953
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteStageFilesCommand.scala
 ##########
 @@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.io.InputStreamReader
+import java.util
+import java.util.Collections
+import java.util.concurrent.Executors
+
+import scala.collection.JavaConverters._
+
+import com.google.gson.Gson
+import org.apache.hadoop.conf.Configuration
+import org.apache.log4j.Logger
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.statusmanager.StageInput
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+/**
+ * Delete carbon data files of table stages.
+ *
+ * @param databaseNameOp database name
+ * @param tableName      table name
+ */
+case class CarbonDeleteStageFilesCommand(
+    databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String]
+) extends DataCommand {
+
+  @transient var LOGGER: Logger = _
+
+  override def processData(spark: SparkSession): Seq[Row] = {
+    LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    Checker.validateTableExists(databaseNameOp, tableName, spark)
+    val table = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(spark)
+    val configuration = spark.sessionState.newHadoopConf()
+    FileFactory.getConfiguration.addResource(configuration)
+    setAuditTable(table)
+    if (!table.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
+    }
+    if (table.isChildTableForMV) {
+      throw new MalformedCarbonCommandException("Unsupported operation on MV table")
+    }
+    val tablePath = table.getTablePath
+    val startTime = System.currentTimeMillis()
+    val stageDataFileLocation = options.get("location")
+    if (stageDataFileLocation.isEmpty) {
+      throw new MalformedCarbonCommandException("Option [location] is not specified.")
+    }
+    val stageDataFileActiveTime = try {
+      Integer.valueOf(options.getOrElse("retain_hour", "0")) * 3600000
+    } catch {
+      case _: NumberFormatException =>
+        throw new MalformedCarbonCommandException(
+          "Option [retain_hour] is not a number.")
+    }
+    if (stageDataFileActiveTime < 0) {
+      throw new MalformedCarbonCommandException(
+        "Option [retain_hour] is negative.")
+    }
+    val stageDataFilesReferenced =
+      listStageDataFilesReferenced(listStageMetadataFiles(tablePath, configuration), configuration)
+    val stageDataFiles = listStageDataFiles(stageDataFileLocation.get, configuration)
+    stageDataFiles.map(
+      stageDataFile => {
+        // Which file will be deleted:
+        // 1. Not referenced by any stage file;
+        // 2. Has passed retain time.
+        if (stageDataFilesReferenced.contains(stageDataFile.getCanonicalPath)) {
+          null
+        } else if ((startTime - stageDataFile.getLastModifiedTime) < stageDataFileActiveTime) {
+          null
+        } else if (!stageDataFile.delete()) {
+          null
+        } else {
+          // Stage data file deleted.
+          stageDataFile.getCanonicalPath
+        }
+      }
+    ).filter(
+      stageDataFile => stageDataFile != null
+    ).map(
+      stageDataFile => Row.fromSeq(Seq(stageDataFile))
 
 Review comment:
   replace with .collect()
   No need to loop twice

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] kunal642 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.

GitBox
In reply to this post by GitBox
kunal642 commented on a change in pull request #3602: [CARBONDATA-3676] Support clean carbon data files of stages.
URL: https://github.com/apache/carbondata/pull/3602#discussion_r375650699
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteStageFilesCommand.scala
 ##########
 @@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.io.InputStreamReader
+import java.util
+import java.util.Collections
+import java.util.concurrent.Executors
+
+import scala.collection.JavaConverters._
+
+import com.google.gson.Gson
+import org.apache.hadoop.conf.Configuration
+import org.apache.log4j.Logger
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.statusmanager.StageInput
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+/**
+ * Delete carbon data files of table stages.
+ *
+ * @param databaseNameOp database name
+ * @param tableName      table name
+ */
+case class CarbonDeleteStageFilesCommand(
+    databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String]
+) extends DataCommand {
+
+  @transient var LOGGER: Logger = _
 
 Review comment:
   change to val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
1234