Skip to content

Commit

Permalink
add tests and minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Udbhav30 committed Aug 22, 2020
1 parent a9806aa commit 7447f18
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 5 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ private[spark] object Utils extends Logging {
partitionPath: Path,
isTrashEnabled: Boolean,
hadoopConf: Configuration): Unit = {
if (isTrashEnabled && hadoopConf.getInt("fs.trash.interval", 0) > 0) {
if (isTrashEnabled) {
logDebug(s"will move data ${partitionPath.toString} to trash")
val isSuccess = Trash.moveToAppropriateTrash(fs, partitionPath, hadoopConf)
if (!isSuccess) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2703,10 +2703,11 @@ object SQLConf {

val TRUNCATE_TRASH_ENABLED =
buildConf("spark.sql.truncate.trash.enabled")
.doc("This Configuration will decide whether move files to trash on truncate table given, " +
"'fs.trash.interval' is positive in Hadoop Configuration. " +
"Note that, in Hadoop conf if server side has this configured then the client side " +
"one will be ignored. ")
.doc("This configuration decides when truncating table, whether data files will be moved " +
"to trash directory or deleted permanently. The trash retention time is controlled by " +
"fs.trash.interval, and in default, the server side configuration value takes " +
"precedence over the client-side one. Note that if fs.trash.interval is non-positive, " +
"this will be a no-op and log a warning message.")
.booleanConf
.createWithDefault(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3073,6 +3073,28 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
assert(!spark.sessionState.catalog.isRegisteredFunction(func))
}
}

test("Move data to trash on truncate table if enabled") {
withTable("tab1") {
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
sql("CREATE TABLE tab1 (col INT)")
sql("INSERT INTO tab1 SELECT 1")

val tablePath = new Path(spark.sessionState.catalog
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)
val hadoopConf = spark.sessionState.newHadoopConf()
val fs = tablePath.getFileSystem(hadoopConf)
// trash interval should be configured from hadoop side
hadoopConf.setInt("fs.trash.Interval", 5)

val trashRoot = fs.getTrashRoot(tablePath)
assert(!fs.exists(trashRoot))
sql("TRUNCATE TABLE tab1")
assert(fs.exists(trashRoot))
fs.delete(trashRoot, true)
}
}
}
}

object FakeLocalFsFileSystem {
Expand Down

0 comments on commit 7447f18

Please sign in to comment.