diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 35d60bb514405..b8b044bbad30e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -50,7 +50,7 @@ import com.google.common.net.InetAddresses import org.apache.commons.codec.binary.Hex import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.apache.hadoop.fs.{FileSystem, FileUtil, Path, Trash} import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -269,6 +269,29 @@ private[spark] object Utils extends Logging { file.setExecutable(true, true) } + /** + * Move data to trash if 'spark.sql.truncate.trash.enabled' is true, else + * delete the data permanently. If move data to trash failed fallback to hard deletion. + */ + def moveToTrashOrDelete( + fs: FileSystem, + partitionPath: Path, + isTrashEnabled: Boolean, + hadoopConf: Configuration): Boolean = { + if (isTrashEnabled) { + logDebug(s"Try to move data ${partitionPath.toString} to trash") + val isSuccess = Trash.moveToAppropriateTrash(fs, partitionPath, hadoopConf) + if (!isSuccess) { + logWarning(s"Failed to move data ${partitionPath.toString} to trash. " + + "Fallback to hard deletion") + return fs.delete(partitionPath, true) + } + isSuccess + } else { + fs.delete(partitionPath, true) + } + } + /** * Create a directory given the abstract pathname * @return true, if the directory is successfully created; otherwise, return false. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index efc1c9a297656..dca421a09da62 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2732,6 +2732,18 @@ object SQLConf { .booleanConf .createWithDefault(false) + val TRUNCATE_TRASH_ENABLED = + buildConf("spark.sql.truncate.trash.enabled") + .doc("This configuration decides when truncating table, whether data files will be moved " + + "to trash directory or deleted permanently. The trash retention time is controlled by " + + "'fs.trash.interval', and in default, the server side configuration value takes " + + "precedence over the client-side one. Note that if 'fs.trash.interval' is non-positive, " + + "this will be a no-op and log a warning message. If the data fails to be moved to " + + "trash, Spark will turn to delete it permanently.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + /** * Holds information about keys that have been deprecated. * @@ -3350,6 +3362,8 @@ class SQLConf extends Serializable with Logging { def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR) + def truncateTrashEnabled: Boolean = getConf(SQLConf.TRUNCATE_TRASH_ENABLED) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 7aebdddf1d59c..f94c9712a31cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -48,6 +48,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.util.Utils /** * A command to create a table with the same definition of the given existing table. @@ -489,6 +490,7 @@ case class TruncateTableCommand( } val hadoopConf = spark.sessionState.newHadoopConf() val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl + val isTrashEnabled = SQLConf.get.truncateTrashEnabled locations.foreach { location => if (location.isDefined) { val path = new Path(location.get) @@ -513,7 +515,7 @@ case class TruncateTableCommand( } } - fs.delete(path, true) + Utils.moveToTrashOrDelete(fs, path, isTrashEnabled, hadoopConf) // We should keep original permission/acl of the path. // For owner/group, only super-user can set it, for example on HDFS. Because diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 17857a6ce173d..b8ac5079b7745 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -3101,6 +3101,81 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { assert(spark.sessionState.catalog.isRegisteredFunction(rand)) } } + + test("SPARK-32481 Move data to trash on truncate table if enabled") { + val trashIntervalKey = "fs.trash.interval" + withTable("tab1") { + withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") { + sql("CREATE TABLE tab1 (col INT) USING parquet") + sql("INSERT INTO tab1 SELECT 1") + // scalastyle:off hadoopconfiguration + val hadoopConf = spark.sparkContext.hadoopConfiguration + // scalastyle:on hadoopconfiguration + val originalValue = hadoopConf.get(trashIntervalKey, "0") + val tablePath = new Path(spark.sessionState.catalog + .getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get) + + val fs = tablePath.getFileSystem(hadoopConf) + val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current") + val trashPath = Path.mergePaths(trashCurrent, tablePath) + assert(!fs.exists(trashPath)) + try { + hadoopConf.set(trashIntervalKey, "5") + sql("TRUNCATE TABLE tab1") + } finally { + hadoopConf.set(trashIntervalKey, originalValue) + } + assert(fs.exists(trashPath)) + fs.delete(trashPath, true) + } + } + } + + test("SPARK-32481 delete data permanently on truncate table if trash interval is non-positive") { + val trashIntervalKey = "fs.trash.interval" + withTable("tab1") { + withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") { + sql("CREATE TABLE tab1 (col INT) USING parquet") + sql("INSERT INTO tab1 SELECT 1") + // scalastyle:off hadoopconfiguration + val hadoopConf = spark.sparkContext.hadoopConfiguration + // scalastyle:on hadoopconfiguration + val originalValue = hadoopConf.get(trashIntervalKey, "0") + val tablePath = new Path(spark.sessionState.catalog + .getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get) + + val fs = tablePath.getFileSystem(hadoopConf) + val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current") + val trashPath = Path.mergePaths(trashCurrent, tablePath) + assert(!fs.exists(trashPath)) + try { + hadoopConf.set(trashIntervalKey, "0") + sql("TRUNCATE TABLE tab1") + } finally { + hadoopConf.set(trashIntervalKey, originalValue) + } + assert(!fs.exists(trashPath)) + } + } + } + + test("SPARK-32481 Do not move data to trash on truncate table if disabled") { + withTable("tab1") { + withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "false") { + sql("CREATE TABLE tab1 (col INT) USING parquet") + sql("INSERT INTO tab1 SELECT 1") + val hadoopConf = spark.sessionState.newHadoopConf() + val tablePath = new Path(spark.sessionState.catalog + .getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get) + + val fs = tablePath.getFileSystem(hadoopConf) + val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current") + val trashPath = Path.mergePaths(trashCurrent, tablePath) + sql("TRUNCATE TABLE tab1") + assert(!fs.exists(trashPath)) + } + } + } } object FakeLocalFsFileSystem {