Skip to content

Commit

Permalink
[SPARK-1860] More conservative app directory cleanup.
Browse files Browse the repository at this point in the history
First contribution to the project, so apologize for any significant errors.

This PR addresses [SPARK-1860]. The application directories are now cleaned up in a more conservative manner.

Previously, app-* directories were cleaned up if the directory's timestamp was older than a given time. However, the timestamp on a directory does not reflect the modification times of the files in that directory. Therefore, app-* directories were wiped out even if the files inside them were created recently and possibly being used by Executor tasks.

The solution is to change the cleanup logic to inspect all files within the app-* directory and only eliminate the app-* directory if all files in the directory are stale.

Author: mcheah <mcheah@palantir.com>

Closes apache#2609 from mccheah/worker-better-app-dir-cleanup and squashes the following commits:

87b5d03 [mcheah] [SPARK-1860] Using more string interpolation. Better error logging.
802473e [mcheah] [SPARK-1860] Cleaning up the logs generated when cleaning directories.
e0a1f2e [mcheah] [SPARK-1860] Fixing broken unit test.
77a9de0 [mcheah] [SPARK-1860] More conservative app directory cleanup.
  • Loading branch information
mccheah authored and aarondav committed Oct 3, 2014
1 parent 79e45c9 commit cf1d32e
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[spark] class ExecutorRunner(
val workerId: String,
val host: String,
val sparkHome: File,
val workDir: File,
val executorDir: File,
val workerUrl: String,
val conf: SparkConf,
var state: ExecutorState.Value)
Expand Down Expand Up @@ -130,12 +130,6 @@ private[spark] class ExecutorRunner(
*/
def fetchAndRunExecutor() {
try {
// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}

// Launch the process
val command = getCommandSeq
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
Expand Down
37 changes: 31 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package org.apache.spark.deploy.worker

import java.io.File
import java.io.IOException
import java.text.SimpleDateFormat
import java.util.Date

import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import scala.language.postfixOps

import akka.actor._
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.commons.io.FileUtils

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
Expand Down Expand Up @@ -191,6 +194,7 @@ private[spark] class Worker(
changeMaster(masterUrl, masterWebUiUrl)
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
if (CLEANUP_ENABLED) {
logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
}
Expand All @@ -201,10 +205,23 @@ private[spark] class Worker(
case WorkDirCleanup =>
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
val cleanupFuture = concurrent.future {
logInfo("Cleaning up oldest application directories in " + workDir + " ...")
Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS)
.foreach(Utils.deleteRecursively)
val appDirs = workDir.listFiles()
if (appDirs == null) {
throw new IOException("ERROR: Failed to list files in " + appDirs)
}
appDirs.filter { dir =>
// the directory is used by an application - check that the application is not running
// when cleaning up
val appIdFromDir = dir.getName
val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir)
dir.isDirectory && !isAppStillRunning &&
!Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS)
}.foreach { dir =>
logInfo(s"Removing directory: ${dir.getPath}")
Utils.deleteRecursively(dir)
}
}

cleanupFuture onFailure {
case e: Throwable =>
logError("App dir cleanup failed: " + e.getMessage, e)
Expand Down Expand Up @@ -233,21 +250,29 @@ private[spark] class Worker(
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}

val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING)
self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
memoryUsed += memory_
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
} catch {
case e: Exception => {
logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
Some(e.toString), None)
}
}
}
Expand Down
21 changes: 13 additions & 8 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import scala.util.control.{ControlThrowable, NonFatal}

import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.commons.io.FileUtils
import org.apache.commons.io.filefilter.TrueFileFilter
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.log4j.PropertyConfigurator
Expand Down Expand Up @@ -705,17 +707,20 @@ private[spark] object Utils extends Logging {
}

/**
* Finds all the files in a directory whose last modified time is older than cutoff seconds.
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
* @param cutoff measured in seconds. Files older than this are returned.
* Determines if a directory contains any files newer than cutoff seconds.
*
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
* @param cutoff measured in seconds. Returns true if there are any files in dir newer than this.
*/
def findOldFiles(dir: File, cutoff: Long): Seq[File] = {
def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long): Boolean = {
val currentTimeMillis = System.currentTimeMillis
if (dir.isDirectory) {
val files = listFilesSafely(dir)
files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) }
if (!dir.isDirectory) {
throw new IllegalArgumentException (dir + " is not a directory!")
} else {
throw new IllegalArgumentException(dir + " is not a directory!")
val files = FileUtils.listFilesAndDirs(dir, TrueFileFilter.TRUE, TrueFileFilter.TRUE)
val cutoffTimeInMillis = (currentTimeMillis - (cutoff * 1000))
val newFiles = files.filter { _.lastModified > cutoffTimeInMillis }
newFiles.nonEmpty
}
}

Expand Down
23 changes: 17 additions & 6 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -189,17 +189,28 @@ class UtilsSuite extends FunSuite {
assert(Utils.getIteratorSize(iterator) === 5L)
}

test("findOldFiles") {
test("doesDirectoryContainFilesNewerThan") {
// create some temporary directories and files
val parent: File = Utils.createTempDir()
val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories
val child2: File = Utils.createTempDir(parent.getCanonicalPath)
// set the last modified time of child1 to 10 secs old
child1.setLastModified(System.currentTimeMillis() - (1000 * 10))
val child3: File = Utils.createTempDir(child1.getCanonicalPath)
// set the last modified time of child1 to 30 secs old
child1.setLastModified(System.currentTimeMillis() - (1000 * 30))

val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs
assert(result.size.equals(1))
assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
// although child1 is old, child2 is still new so return true
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))

child2.setLastModified(System.currentTimeMillis - (1000 * 30))
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))

parent.setLastModified(System.currentTimeMillis - (1000 * 30))
// although parent and its immediate children are new, child3 is still old
// we expect a full recursive search for new files.
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))

child3.setLastModified(System.currentTimeMillis - (1000 * 30))
assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
}

test("resolveURI") {
Expand Down

0 comments on commit cf1d32e

Please sign in to comment.