Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32481][CORE][SQL] Support truncate table to move data to trash #29552

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import com.google.common.net.InetAddresses
import org.apache.commons.codec.binary.Hex
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path, Trash}
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
Expand Down Expand Up @@ -269,6 +269,29 @@ private[spark] object Utils extends Logging {
file.setExecutable(true, true)
}

/**
* Move data to trash if 'spark.sql.truncate.trash.enabled' is true, else
* delete the data permanently. If move data to trash failed fallback to hard deletion.
Udbhav30 marked this conversation as resolved.
Show resolved Hide resolved
*/
def moveToTrashIfEnabled(
Udbhav30 marked this conversation as resolved.
Show resolved Hide resolved
fs: FileSystem,
partitionPath: Path,
isTrashEnabled: Boolean,
hadoopConf: Configuration): Boolean = {
if (isTrashEnabled) {
logDebug(s"will move data ${partitionPath.toString} to trash")
Udbhav30 marked this conversation as resolved.
Show resolved Hide resolved
val isSuccess = Trash.moveToAppropriateTrash(fs, partitionPath, hadoopConf)
if (!isSuccess) {
logWarning(s"Failed to move data ${partitionPath.toString} to trash. " +
"Fallback to hard deletion")
return fs.delete(partitionPath, true)
}
isSuccess
} else {
fs.delete(partitionPath, true)
}
}

/**
* Create a directory given the abstract pathname
* @return true, if the directory is successfully created; otherwise, return false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2722,6 +2722,18 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val TRUNCATE_TRASH_ENABLED =
buildConf("spark.sql.truncate.trash.enabled")
.doc("This configuration decides when truncating table, whether data files will be moved " +
"to trash directory or deleted permanently. The trash retention time is controlled by " +
"fs.trash.interval, and in default, the server side configuration value takes " +
Udbhav30 marked this conversation as resolved.
Show resolved Hide resolved
"precedence over the client-side one. Note that if fs.trash.interval is non-positive, " +
Udbhav30 marked this conversation as resolved.
Show resolved Hide resolved
"this will be a no-op and log a warning message. If the data fails to be moved to " +
"trash, Spark will turn to delete it permanently.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

/**
* Holds information about keys that have been deprecated.
*
Expand Down Expand Up @@ -3334,6 +3346,8 @@ class SQLConf extends Serializable with Logging {

def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR)

def truncateTrashEnabled: Boolean = getConf(SQLConf.TRUNCATE_TRASH_ENABLED)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils

/**
* A command to create a table with the same definition of the given existing table.
Expand Down Expand Up @@ -489,6 +490,7 @@ case class TruncateTableCommand(
}
val hadoopConf = spark.sessionState.newHadoopConf()
val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl
val isTrashEnabled = SQLConf.get.truncateTrashEnabled
locations.foreach { location =>
if (location.isDefined) {
val path = new Path(location.get)
Expand All @@ -513,7 +515,7 @@ case class TruncateTableCommand(
}
}

fs.delete(path, true)
Utils.moveToTrashIfEnabled(fs, path, isTrashEnabled, hadoopConf)

// We should keep original permission/acl of the path.
// For owner/group, only super-user can set it, for example on HDFS. Because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3101,6 +3101,81 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
assert(spark.sessionState.catalog.isRegisteredFunction(rand))
}
}

test("SPARK-32481 Move data to trash on truncate table if enabled") {
val trashIntervalKey = "fs.trash.interval"
withTable("tab1") {
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
sql("CREATE TABLE tab1 (col INT) USING parquet")
sql("INSERT INTO tab1 SELECT 1")
// scalastyle:off hadoopconfiguration
val hadoopConf = spark.sparkContext.hadoopConfiguration
// scalastyle:on hadoopconfiguration
val originalValue = hadoopConf.get(trashIntervalKey, "0")
val tablePath = new Path(spark.sessionState.catalog
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)

val fs = tablePath.getFileSystem(hadoopConf)
val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current")
Udbhav30 marked this conversation as resolved.
Show resolved Hide resolved
val trashPath = Path.mergePaths(trashCurrent, tablePath)
assert(!fs.exists(trashPath))
try {
hadoopConf.set(trashIntervalKey, "5")
sql("TRUNCATE TABLE tab1")
} finally {
hadoopConf.set(trashIntervalKey, originalValue)
}
assert(fs.exists(trashPath))
fs.delete(trashPath, true)
}
}
}

test("SPARK-32481 delete data permanently on truncate table if trash interval is non-positive") {
val trashIntervalKey = "fs.trash.interval"
withTable("tab1") {
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
sql("CREATE TABLE tab1 (col INT) USING parquet")
sql("INSERT INTO tab1 SELECT 1")
// scalastyle:off hadoopconfiguration
val hadoopConf = spark.sparkContext.hadoopConfiguration
// scalastyle:on hadoopconfiguration
val originalValue = hadoopConf.get(trashIntervalKey, "0")
val tablePath = new Path(spark.sessionState.catalog
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)

val fs = tablePath.getFileSystem(hadoopConf)
val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current")
val trashPath = Path.mergePaths(trashCurrent, tablePath)
assert(!fs.exists(trashPath))
try {
hadoopConf.set(trashIntervalKey, "0")
sql("TRUNCATE TABLE tab1")
} finally {
hadoopConf.set(trashIntervalKey, originalValue)
}
assert(!fs.exists(trashPath))
}
}
}

test("SPARK-32481 Do not move data to trash on truncate table if disabled") {
withTable("tab1") {
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "false") {
sql("CREATE TABLE tab1 (col INT) USING parquet")
sql("INSERT INTO tab1 SELECT 1")
val hadoopConf = spark.sessionState.newHadoopConf()
val tablePath = new Path(spark.sessionState.catalog
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)

val fs = tablePath.getFileSystem(hadoopConf)
val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current")
val trashPath = Path.mergePaths(trashCurrent, tablePath)
sql("TRUNCATE TABLE tab1")
assert(!fs.exists(trashPath))
}
}
}
}

object FakeLocalFsFileSystem {
Expand Down