Skip to content

Commit

Permalink
SPARK-1582 Invoke Thread.interrupt() when cancelling jobs
Browse files Browse the repository at this point in the history
Sometimes executor threads are blocked waiting for IO or monitors, and the current implementation of job cancellation may never recover these threads. By simply invoking Thread.interrupt() during cancellation, we can often safely unblock the threads and use them for subsequent work.

Note that this feature must remain optional for now because of a bug in HDFS where Thread.interrupt() may cause nodes to be marked as permanently dead (as the InterruptedException is reinterpreted as an IOException during communication with some node).

Author: Aaron Davidson <aaron@databricks.com>

Closes apache#498 from aarondav/cancel and squashes the following commits:

e52b829 [Aaron Davidson] Don't use job.properties when null
82f78bb [Aaron Davidson] Update DAGSchedulerSuite
b67f472 [Aaron Davidson] Add comment on why interruptOnCancel is in setJobGroup
4cb9fd6 [Aaron Davidson] SPARK-1582 Invoke Thread.interrupt() when cancelling jobs

Conflicts:
	core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
	core/src/main/scala/org/apache/spark/executor/Executor.scala
	core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
	core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
	core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
	core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
	core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
  • Loading branch information
aarondav authored and markhamstra committed Apr 24, 2014
1 parent e794d53 commit f96d67d
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 20 deletions.
15 changes: 14 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -355,16 +355,27 @@ class SparkContext(
* // In a separate thread:
* sc.cancelJobGroup("some_job_to_cancel")
* }}}
*
* If interruptOnCancel is set to true for the job group, then job cancellation will result
* in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
* that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
* where HDFS may respond to Thread.interrupt() by marking nodes as dead.
*/
def setJobGroup(groupId: String, description: String) {
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description)
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
// Note: Specifying interruptOnCancel in setJobGroup (rather than cancelJobGroup) avoids
// changing several public APIs and allows Spark cancellations outside of the cancelJobGroup
// APIs to also take advantage of this property (e.g., internal job failures or canceling from
// JobProgressTab UI) on a per-job basis.
setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, interruptOnCancel.toString)
}

/** Clear the job group id and its description. */
def clearJobGroup() {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null)
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null)
setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
}

// Post init
Expand Down Expand Up @@ -1022,6 +1033,8 @@ object SparkContext {

private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"

private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"

private[spark] val SPARK_UNKNOWN_USER = "<unknown>"

implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ private[spark] class CoarseGrainedExecutorBackend(
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}

case KillTask(taskId, _) =>
case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
logError("Received KillTask command but executor was null")
System.exit(1)
} else {
executor.killTask(taskId)
executor.killTask(taskId, interruptThread)
}

case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ private[spark] class Executor(
threadPool.execute(tr)
}

def killTask(taskId: Long) {
def killTask(taskId: Long, interruptThread: Boolean) {
val tr = runningTasks.get(taskId)
if (tr != null) {
tr.kill()
tr.kill(interruptThread)
}
}

Expand All @@ -166,11 +166,11 @@ private[spark] class Executor(
@volatile private var killed = false
@volatile private var task: Task[Any] = _

def kill() {
def kill(interruptThread: Boolean) {
logInfo("Executor is trying to kill task " + taskId)
killed = true
if (task != null) {
task.kill()
task.kill(interruptThread)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ private[spark] class MesosExecutorBackend
if (executor == null) {
logError("Received KillTask but executor was null")
} else {
executor.killTask(t.getValue.toLong)
// TODO: Determine the 'interruptOnCancel' property set for the given job.
executor.killTask(t.getValue.toLong, interruptThread = false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,10 @@ class DAGScheduler(
logDebug("Trying to cancel unregistered job " + jobId)
} else {
val independentStages = removeJobAndIndependentStages(jobId)
independentStages.foreach { taskSched.cancelTasks }
val shouldInterruptThread =
if (job.properties == null) false
else job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false").toBoolean
independentStages.foreach { taskSched.cancelTasks(_, shouldInterruptThread) }
val error = new SparkException("Job %d cancelled".format(jobId))
val job = idToActiveJob(jobId)
job.listener.jobFailed(error)
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex

final def run(attemptId: Long): T = {
context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)
taskThread = Thread.currentThread()
if (_killed) {
kill()
kill(interruptThread = false)
}
runTask(context)
}
Expand All @@ -65,6 +66,9 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
// Task context, to be initialized in run().
@transient protected var context: TaskContext = _

// The actual Thread on which the task is running, if any. Initialized in run().
@volatile @transient private var taskThread: Thread = _

// A flag to indicate whether the task is killed. This is used in case context is not yet
// initialized when kill() is invoked.
@volatile @transient private var _killed = false
Expand All @@ -78,12 +82,16 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
* Kills a task by setting the interrupted flag to true. This relies on the upper level Spark
* code and user code to properly handle the flag. This function should be idempotent so it can
* be called multiple times.
* If interruptThread is true, we will also call Thread.interrupt() on the Task's executor thread.
*/
def kill() {
def kill(interruptThread: Boolean) {
_killed = true
if (context != null) {
context.interrupted = true
}
if (interruptThread && taskThread != null) {
taskThread.interrupt()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[spark] trait TaskScheduler {
def submitTasks(taskSet: TaskSet): Unit

// Cancel a stage.
def cancelTasks(stageId: Int)
def cancelTasks(stageId: Int, interruptThread: Boolean)

// Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
def setDAGScheduler(dagScheduler: DAGScheduler): Unit
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ private[spark] class TaskSet(
val properties: Properties) {
val id: String = stageId + "." + attempt

def kill() {
tasks.foreach(_.kill())
def kill(interruptThread: Boolean) {
tasks.foreach(_.kill(interruptThread))
}

override def toString: String = "TaskSet " + id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ private[spark] object CoarseGrainedClusterMessages {
// Driver to executors
case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage

case class KillTask(taskId: Long, executor: String) extends CoarseGrainedClusterMessage
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
extends CoarseGrainedClusterMessage

case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
extends CoarseGrainedClusterMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
case ReviveOffers =>
makeOffers()

case KillTask(taskId, executorId) =>
executorActor(executorId) ! KillTask(taskId, executorId)
case KillTask(taskId, executorId, interruptThread) =>
executorActor(executorId) ! KillTask(taskId, executorId, interruptThread)

case StopDriver =>
sender ! true
Expand Down Expand Up @@ -215,8 +215,8 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
driverActor ! ReviveOffers
}

override def killTask(taskId: Long, executorId: String) {
driverActor ! KillTask(taskId, executorId)
override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) {
driverActor ! KillTask(taskId, executorId, interruptThread)
}

override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
taskSets += taskSet
}
override def cancelTasks(stageId: Int) {}
override def cancelTasks(stageId: Int, interruptThread: Boolean) {
cancelledStages += stageId
}
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
}
Expand Down

0 comments on commit f96d67d

Please sign in to comment.