Skip to content

Commit

Permalink
[RFC] SPARK-1772 Stop catching Throwable, let Executors die
Browse files Browse the repository at this point in the history
The main issue this patch fixes is [SPARK-1772](https://issues.apache.org/jira/browse/SPARK-1772),
in which Executors may not die when fatal exceptions (e.g., OOM) are thrown. This patch causes
Executors to delegate to the ExecutorUncaughtExceptionHandler when a fatal exception is thrown.

This patch also continues the fight in the neverending war against `case t: Throwable =>`,
by only catching Exceptions in many places, and adding a wrapper for Threads and Runnables
to make sure any uncaught exceptions are at least printed to the logs.

It also turns out that it is unlikely that the IndestructibleActorSystem actually works,
given testing ([here](https://gist.github.com/aarondav/ca1f0cdcd50727f89c0d)). The
uncaughtExceptionHandler is not called from the places that we expected it would be.
[SPARK-1620](https://issues.apache.org/jira/browse/SPARK-1620) deals with part of this
issue, but refactoring our Actor Systems to ensure that exceptions are dealt with properly
is a much bigger change, outside the scope of this PR.
  • Loading branch information
aarondav committed May 9, 2014
1 parent 191279c commit 1867867
Show file tree
Hide file tree
Showing 19 changed files with 142 additions and 148 deletions.
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

/**
* Classes that represent cleaning tasks.
Expand Down Expand Up @@ -110,7 +111,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning() {
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
Expand All @@ -128,7 +129,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
}
} catch {
case t: Throwable => logError("Error in cleaning thread", t)
case e: Exception => logError("Error in cleaning thread", e)
}
}
}
Expand All @@ -141,7 +142,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
listeners.foreach(_.rddCleaned(rddId))
logInfo("Cleaned RDD " + rddId)
} catch {
case t: Throwable => logError("Error cleaning RDD " + rddId, t)
case e: Exception => logError("Error cleaning RDD " + rddId, e)
}
}

Expand All @@ -154,7 +155,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
listeners.foreach(_.shuffleCleaned(shuffleId))
logInfo("Cleaned shuffle " + shuffleId)
} catch {
case t: Throwable => logError("Error cleaning shuffle " + shuffleId, t)
case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
}
}

Expand All @@ -166,7 +167,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
listeners.foreach(_.broadcastCleaned(broadcastId))
logInfo("Cleaned broadcast " + broadcastId)
} catch {
case t: Throwable => logError("Error cleaning broadcast " + broadcastId, t)
case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
}
}

Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1494,8 +1494,8 @@ object SparkContext extends Logging {
} catch {
// TODO: Enumerate the exact reasons why it can fail
// But irrespective of it, it means we cannot proceed !
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
Expand All @@ -1510,8 +1510,8 @@ object SparkContext extends Logging {
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]

} catch {
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}

Expand All @@ -1521,8 +1521,8 @@ object SparkContext extends Logging {
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ private[spark] class PythonRDD[T: ClassTag](
this.interrupt()
}

override def run() {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
SparkEnv.set(env)
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
Expand Down Expand Up @@ -281,7 +281,7 @@ private[spark] object PythonRDD {
}
} catch {
case eof: EOFException => {}
case e: Throwable => throw e
case e: Exception => throw e
}
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
stopDaemon()
startDaemon()
new Socket(daemonHost, daemonPort)
case e: Throwable => throw e
case e: Exception => throw e
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ object Client {
// TODO: See if we can initialize akka so return messages are sent back using the same TCP
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, false, conf, new SecurityManager(conf))
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ object SparkHadoopUtil {
.newInstance()
.asInstanceOf[SparkHadoopUtil]
} catch {
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
case e: Exception => throw new SparkException("Unable to load YARN support", e)
}
} else {
new SparkHadoopUtil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class HistoryServer(
* TODO: Add a mechanism to update manually.
*/
private val logCheckingThread = new Thread {
override def run() {
override def run(): Unit = Utils.logUncaughtExceptions {
while (!stopped) {
val now = System.currentTimeMillis
if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
Expand Down Expand Up @@ -154,7 +154,7 @@ class HistoryServer(
numCompletedApplications = logInfos.size

} catch {
case t: Throwable => logError("Exception in checking for event log updates", t)
case e: Exception => logError("Exception in checking for event log updates", e)
}
} else {
logWarning("Attempted to check for event log updates before binding the server.")
Expand Down Expand Up @@ -231,8 +231,8 @@ class HistoryServer(
dir.getModificationTime
}
} catch {
case t: Throwable =>
logError("Exception in accessing modification time of %s".format(dir.getPath), t)
case e: Exception =>
logError("Exception in accessing modification time of %s".format(dir.getPath), e)
-1L
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,8 +675,8 @@ private[spark] class Master(
webUi.attachSparkUI(ui)
return true
} catch {
case t: Throwable =>
logError("Exception in replaying log for application %s (%s)".format(appName, app.id), t)
case e: Exception =>
logError("Exception in replaying log for application %s (%s)".format(appName, app.id), e)
}
} else {
logWarning("Application %s (%s) has no valid logs: %s".format(appName, app.id, eventLogDir))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object DriverWrapper {
case workerUrl :: mainClass :: extraArgs =>
val conf = new SparkConf()
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
Utils.localHostName(), 0, false, conf, new SecurityManager(conf))
Utils.localHostName(), 0, conf, new SecurityManager(conf))
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")

// Delegate to supplied main class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private[spark] object CoarseGrainedExecutorBackend {
// Create a new ActorSystem to run the backend, because we can't create a
// SparkEnv / Executor before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
indestructible = true, conf = conf, new SecurityManager(conf))
conf, new SecurityManager(conf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Expand Down
58 changes: 24 additions & 34 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,7 @@ private[spark] class Executor(
// Setup an uncaught exception handler for non-local mode.
// Make any thread terminations due to uncaught exceptions kill the entire
// executor process to avoid surprising stalls.
Thread.setDefaultUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler {
override def uncaughtException(thread: Thread, exception: Throwable) {
try {
logError("Uncaught exception in thread " + thread, exception)

// We may have been called from a shutdown hook. If so, we must not call System.exit().
// (If we do, we will deadlock.)
if (!Utils.inShutdown()) {
if (exception.isInstanceOf[OutOfMemoryError]) {
System.exit(ExecutorExitCode.OOM)
} else {
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
}
}
} catch {
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
}
}
)
Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler)
}

val executorSource = new ExecutorSource(this, executorId)
Expand Down Expand Up @@ -259,19 +238,30 @@ private[spark] class Executor(
}

case t: Throwable => {
val serviceTime = System.currentTimeMillis() - taskStart
val metrics = attemptedTask.flatMap(t => t.metrics)
for (m <- metrics) {
m.executorRunTime = serviceTime
m.jvmGCTime = gcTime - startGCTime
}
val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
// Attempt to exit cleanly by informing the driver of our failure.
// If anything goes wrong (or this was a fatal exception), we will delegate to
// the default uncaught exception handler, which will terminate the Executor.
try {
logError("Exception in task ID " + taskId, t)

val serviceTime = System.currentTimeMillis() - taskStart
val metrics = attemptedTask.flatMap(t => t.metrics)
for (m <- metrics) {
m.executorRunTime = serviceTime
m.jvmGCTime = gcTime - startGCTime
}
val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

// TODO: Should we exit the whole executor here? On the one hand, the failed task may
// have left some weird state around depending on when the exception was thrown, but on
// the other hand, maybe we could detect that when future tasks fail and exit then.
logError("Exception in task ID " + taskId, t)
// Don't forcibly exit unless the exception was inherently fatal, to avoid
// stopping other tasks unnecessarily.
if (Utils.isFatalError(t)) {
ExecutorUncaughtExceptionHandler.uncaughtException(t)
}
} catch {
case t2: Throwable =>
ExecutorUncaughtExceptionHandler.uncaughtException(t2)
}
}
} finally {
// TODO: Unregister shuffle memory only for ResultTask
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

import org.apache.spark.Logging
import org.apache.spark.util.Utils

/**
* The default uncaught exception handler for Executors terminates the whole process, to avoid
* getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better
* to fail fast when things go wrong.
*/
private[spark] object ExecutorUncaughtExceptionHandler
extends Thread.UncaughtExceptionHandler with Logging {

override def uncaughtException(thread: Thread, exception: Throwable) {
try {
logError("Uncaught exception in thread " + thread, exception)

// We may have been called from a shutdown hook. If so, we must not call System.exit().
// (If we do, we will deadlock.)
if (!Utils.inShutdown()) {
if (exception.isInstanceOf[OutOfMemoryError]) {
System.exit(ExecutorExitCode.OOM)
} else {
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
}
}
} catch {
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
}

def uncaughtException(exception: Throwable) {
uncaughtException(Thread.currentThread(), exception)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ private[spark] object EventLoggingListener extends Logging {
applicationComplete = filePaths.exists { path => isApplicationCompleteFile(path.getName) }
)
} catch {
case t: Throwable =>
logError("Exception in parsing logging info from directory %s".format(logDir), t)
case e: Exception =>
logError("Exception in parsing logging info from directory %s".format(logDir), e)
EventLoggingInfo.empty
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
def enqueueSuccessfulTask(
taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
getTaskResultExecutor.execute(new Runnable {
override def run() {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
val result = serializer.get().deserialize[TaskResult[_]](serializedData) match {
case directResult: DirectTaskResult[_] => directResult
Expand All @@ -70,7 +70,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread.getContextClassLoader
taskSetManager.abort("ClassNotFound with classloader: " + loader)
case ex: Throwable =>
case ex: Exception =>
taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex))
}
}
Expand All @@ -81,7 +81,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
serializedData: ByteBuffer) {
var reason : TaskEndReason = UnknownReason
getTaskResultExecutor.execute(new Runnable {
override def run() {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
if (serializedData != null && serializedData.limit() > 0) {
reason = serializer.get().deserialize[TaskEndReason](
Expand All @@ -94,7 +94,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
val loader = Utils.getContextOrSparkClassLoader
logError(
"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
case ex: Throwable => {}
case ex: Exception => {}
}
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
private def addShutdownHook() {
localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir))
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
override def run() {
override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
DiskBlockManager.this.stop()
}
Expand All @@ -162,8 +162,8 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
try {
if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
} catch {
case t: Throwable =>
logError("Exception while deleting local spark dir: " + localDir, t)
case e: Exception =>
logError("Exception while deleting local spark dir: " + localDir, e)
}
}
}
Expand Down
Loading

0 comments on commit 1867867

Please sign in to comment.