Skip to content

Commit

Permalink
SPARK-4136. Under dynamic allocation, cancel outstanding executor req…
Browse files Browse the repository at this point in the history
…uests when no longer needed

This takes advantage of the changes made in SPARK-4337 to cancel pending requests to YARN when they are no longer needed.

Each time the timer in `ExecutorAllocationManager` strikes, we compute `maxNumNeededExecutors`, the maximum number of executors we could fill with the current load.  This is calculated as the total number of running and pending tasks divided by the number of cores per executor.  If `maxNumNeededExecutors` is below the total number of running and pending executors, we call `requestTotalExecutors(maxNumNeededExecutors)` to let the cluster manager know that it should cancel any pending requests above this amount.  If not, `maxNumNeededExecutors` is just used as a bound in alongside the configured `maxExecutors` to limit the number of new requests.

The patch modifies the API exposed by `ExecutorAllocationClient` for requesting additional executors by moving from `requestExecutors` to `requestTotalExecutors`.  This makes the communication between the `ExecutorAllocationManager` and the `YarnAllocator` easier to reason about and removes some state that needed to be kept in the `CoarseGrainedSchedulerBackend`.  I think an argument can be made that this makes for a less attractive user-facing API in `SparkContext`, but I'm having trouble envisioning situations where a user would want to use either of these APIs.

This will likely break some tests, but I wanted to get feedback on the approach before adding tests and polishing.

Author: Sandy Ryza <sandy@cloudera.com>

Closes #4168 from sryza/sandy-spark-4136 and squashes the following commits:

37ce77d [Sandy Ryza] Warn on negative number
cd3b2ff [Sandy Ryza] SPARK-4136

(cherry picked from commit 69bc3bb)
Signed-off-by: Andrew Or <andrew@databricks.com>
  • Loading branch information
sryza authored and Andrew Or committed Feb 10, 2015
1 parent e508237 commit e53da21
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,17 @@ package org.apache.spark

/**
* A client that communicates with the cluster manager to request or kill executors.
* This is currently supported only in YARN mode.
*/
private[spark] trait ExecutorAllocationClient {

/**
* Express a preference to the cluster manager for a given total number of executors.
* This can result in canceling pending requests or filing additional requests.
* Return whether the request is acknowledged by the cluster manager.
*/
private[spark] def requestTotalExecutors(numExecutors: Int): Boolean

/**
* Request an additional number of executors from the cluster manager.
* Return whether the request is acknowledged by the cluster manager.
Expand Down
149 changes: 104 additions & 45 deletions core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,34 @@ private[spark] class ExecutorAllocationManager(
}

/**
* If the add time has expired, request new executors and refresh the add time.
* If the remove time for an existing executor has expired, kill the executor.
* The number of executors we would have if the cluster manager were to fulfill all our existing
* requests.
*/
private def targetNumExecutors(): Int =
numExecutorsPending + executorIds.size - executorsPendingToRemove.size

/**
* The maximum number of executors we would need under the current load to satisfy all running
* and pending tasks, rounded up.
*/
private def maxNumExecutorsNeeded(): Int = {
val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
(numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
}

/**
* This is called at a fixed interval to regulate the number of pending executor requests
* and number of executors running.
*
* First, adjust our requested executors based on the add time and our current needs.
* Then, if the remove time for an existing executor has expired, kill the executor.
*
* This is factored out into its own method for testing.
*/
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
if (addTime != NOT_SET && now >= addTime) {
addExecutors()
logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeout seconds)")
addTime += sustainedSchedulerBacklogTimeout * 1000
}

addOrCancelExecutorRequests(now)

removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
Expand All @@ -223,59 +239,89 @@ private[spark] class ExecutorAllocationManager(
}
}

/**
* Check to see whether our existing allocation and the requests we've made previously exceed our
* current needs. If so, let the cluster manager know so that it can cancel pending requests that
* are unneeded.
*
* If not, and the add time has expired, see if we can request new executors and refresh the add
* time.
*
* @return the delta in the target number of executors.
*/
private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
val currentTarget = targetNumExecutors
val maxNeeded = maxNumExecutorsNeeded

if (maxNeeded < currentTarget) {
// The target number exceeds the number we actually need, so stop adding new
// executors and inform the cluster manager to cancel the extra pending requests.
val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
client.requestTotalExecutors(newTotalExecutors)
numExecutorsToAdd = 1
updateNumExecutorsPending(newTotalExecutors)
} else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeout seconds)")
addTime += sustainedSchedulerBacklogTimeout * 1000
delta
} else {
0
}
}

/**
* Request a number of executors from the cluster manager.
* If the cap on the number of executors is reached, give up and reset the
* number of executors to add next round instead of continuing to double it.
* Return the number actually requested.
*
* @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending
* tasks could fill
* @return the number of additional executors actually requested.
*/
private def addExecutors(): Int = synchronized {
// Do not request more executors if we have already reached the upper bound
val numExistingExecutors = executorIds.size + numExecutorsPending
if (numExistingExecutors >= maxNumExecutors) {
private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
// Do not request more executors if it would put our target over the upper bound
val currentTarget = targetNumExecutors
if (currentTarget >= maxNumExecutors) {
logDebug(s"Not adding executors because there are already ${executorIds.size} " +
s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)")
numExecutorsToAdd = 1
return 0
}

// The number of executors needed to satisfy all pending tasks is the number of tasks pending
// divided by the number of tasks each executor can fit, rounded up.
val maxNumExecutorsPending =
(listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor
if (numExecutorsPending >= maxNumExecutorsPending) {
logDebug(s"Not adding executors because there are already $numExecutorsPending " +
s"pending and pending tasks could only fill $maxNumExecutorsPending")
numExecutorsToAdd = 1
return 0
}

// It's never useful to request more executors than could satisfy all the pending tasks, so
// cap request at that amount.
// Also cap request with respect to the configured upper bound.
val maxNumExecutorsToAdd = math.min(
maxNumExecutorsPending - numExecutorsPending,
maxNumExecutors - numExistingExecutors)
assert(maxNumExecutorsToAdd > 0)

val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd)

val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd)
val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded)
val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors)
val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors)
if (addRequestAcknowledged) {
logInfo(s"Requesting $actualNumExecutorsToAdd new executor(s) because " +
s"tasks are backlogged (new desired total will be $newTotalExecutors)")
numExecutorsToAdd =
if (actualNumExecutorsToAdd == numExecutorsToAdd) numExecutorsToAdd * 2 else 1
numExecutorsPending += actualNumExecutorsToAdd
actualNumExecutorsToAdd
val delta = updateNumExecutorsPending(newTotalExecutors)
logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" +
s" (new desired total will be $newTotalExecutors)")
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
numExecutorsToAdd * 2
} else {
1
}
delta
} else {
logWarning(s"Unable to reach the cluster manager " +
s"to request $actualNumExecutorsToAdd executors!")
logWarning(
s"Unable to reach the cluster manager to request $newTotalExecutors total executors!")
0
}
}

/**
* Given the new target number of executors, update the number of pending executor requests,
* and return the delta from the old number of pending requests.
*/
private def updateNumExecutorsPending(newTotalExecutors: Int): Int = {
val newNumExecutorsPending =
newTotalExecutors - executorIds.size + executorsPendingToRemove.size
val delta = newNumExecutorsPending - numExecutorsPending
numExecutorsPending = newNumExecutorsPending
delta
}

/**
* Request the cluster manager to remove the given executor.
* Return whether the request is received.
Expand Down Expand Up @@ -415,6 +461,8 @@ private[spark] class ExecutorAllocationManager(
private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
// Number of tasks currently running on the cluster. Should be 0 when no stages are active.
private var numRunningTasks: Int = _

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
val stageId = stageSubmitted.stageInfo.stageId
Expand All @@ -435,6 +483,10 @@ private[spark] class ExecutorAllocationManager(
// This is needed in case the stage is aborted for any reason
if (stageIdToNumTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
if (numRunningTasks != 0) {
logWarning("No stages are running, but numRunningTasks != 0")
numRunningTasks = 0
}
}
}
}
Expand All @@ -446,6 +498,7 @@ private[spark] class ExecutorAllocationManager(
val executorId = taskStart.taskInfo.executorId

allocationManager.synchronized {
numRunningTasks += 1
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
// possible because these events are posted in different threads. (see SPARK-4951)
Expand Down Expand Up @@ -475,7 +528,8 @@ private[spark] class ExecutorAllocationManager(
val executorId = taskEnd.taskInfo.executorId
val taskId = taskEnd.taskInfo.taskId
allocationManager.synchronized {
// If the executor is no longer running scheduled any tasks, mark it as idle
numRunningTasks -= 1
// If the executor is no longer running any scheduled tasks, mark it as idle
if (executorIdToTaskIds.contains(executorId)) {
executorIdToTaskIds(executorId) -= taskId
if (executorIdToTaskIds(executorId).isEmpty) {
Expand Down Expand Up @@ -514,6 +568,11 @@ private[spark] class ExecutorAllocationManager(
}.sum
}

/**
* The number of tasks currently running across all stages.
*/
def totalRunningTasks(): Int = numRunningTasks

/**
* Return true if an executor is not currently running a task, and false otherwise.
*
Expand Down
21 changes: 19 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1103,10 +1103,27 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
listenerBus.addListener(listener)
}

/**
* Express a preference to the cluster manager for a given total number of executors.
* This can result in canceling pending requests or filing additional requests.
* This is currently only supported in YARN mode. Return whether the request is received.
*/
private[spark] override def requestTotalExecutors(numExecutors: Int): Boolean = {
assert(master.contains("yarn") || dynamicAllocationTesting,
"Requesting executors is currently only supported in YARN mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestTotalExecutors(numExecutors)
case _ =>
logWarning("Requesting executors is only supported in coarse-grained mode")
false
}
}

/**
* :: DeveloperApi ::
* Request an additional number of executors from the cluster manager.
* This is currently only supported in Yarn mode. Return whether the request is received.
* This is currently only supported in YARN mode. Return whether the request is received.
*/
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
Expand All @@ -1124,7 +1141,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* :: DeveloperApi ::
* Request that the cluster manager kill the specified executors.
* This is currently only supported in Yarn mode. Return whether the request is received.
* This is currently only supported in YARN mode. Return whether the request is received.
*/
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste

/**
* Request an additional number of executors from the cluster manager.
* Return whether the request is acknowledged.
* @return whether the request is acknowledged.
*/
final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
if (numAdditionalExecutors < 0) {
Expand All @@ -327,6 +327,22 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
doRequestTotalExecutors(newTotal)
}

/**
* Express a preference to the cluster manager for a given total number of executors. This can
* result in canceling pending requests or filing additional requests.
* @return whether the request is acknowledged.
*/
final override def requestTotalExecutors(numExecutors: Int): Boolean = synchronized {

This comment has been minimized.

Copy link
@mengxr

mengxr Feb 10, 2015

Contributor

Does it compile?

This comment has been minimized.

Copy link
@andrewor14

andrewor14 Feb 10, 2015

Contributor

this is fixed in 4e3aa68 in 1.3 and b640c84 in master

if (numAdditionalExecutors < 0) {
throw new IllegalArgumentException(
"Attempted to request a negative number of executor(s) " +
s"$numExecutors from the cluster manager. Please specify a positive number!")
}
numPendingExecutors =
math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)
doRequestTotalExecutors(numExecutors)
}

/**
* Request executors from the cluster manager by specifying the total number desired,
* including existing pending and running executors.
Expand All @@ -337,7 +353,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
* insufficient resources to satisfy the first request. We make the assumption here that the
* cluster manager will eventually fulfill all requests when resources free up.
*
* Return whether the request is acknowledged.
* @return whether the request is acknowledged.
*/
protected def doRequestTotalExecutors(requestedTotal: Int): Boolean = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,33 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(numExecutorsPending(manager) === 9)
}

test("cancel pending executors when no longer needed") {
sc = createSparkContext(1, 10)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5)))

assert(numExecutorsPending(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 1)
assert(numExecutorsPending(manager) === 1)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 2)
assert(numExecutorsPending(manager) === 3)

val task1Info = createTaskInfo(0, 0, "executor-1")
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task1Info))

assert(numExecutorsToAdd(manager) === 4)
assert(addExecutors(manager) === 2)

val task2Info = createTaskInfo(1, 0, "executor-1")
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task2Info))
sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, null, task1Info, null))
sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, null, task2Info, null))

assert(adjustRequestedExecutors(manager) === -1)
}

test("remove executors") {
sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get
Expand Down Expand Up @@ -679,13 +706,15 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {

private val _numExecutorsToAdd = PrivateMethod[Int]('numExecutorsToAdd)
private val _numExecutorsPending = PrivateMethod[Int]('numExecutorsPending)
private val _maxNumExecutorsNeeded = PrivateMethod[Int]('maxNumExecutorsNeeded)
private val _executorsPendingToRemove =
PrivateMethod[collection.Set[String]]('executorsPendingToRemove)
private val _executorIds = PrivateMethod[collection.Set[String]]('executorIds)
private val _addTime = PrivateMethod[Long]('addTime)
private val _removeTimes = PrivateMethod[collection.Map[String, Long]]('removeTimes)
private val _schedule = PrivateMethod[Unit]('schedule)
private val _addExecutors = PrivateMethod[Int]('addExecutors)
private val _addOrCancelExecutorRequests = PrivateMethod[Int]('addOrCancelExecutorRequests)
private val _removeExecutor = PrivateMethod[Boolean]('removeExecutor)
private val _onExecutorAdded = PrivateMethod[Unit]('onExecutorAdded)
private val _onExecutorRemoved = PrivateMethod[Unit]('onExecutorRemoved)
Expand Down Expand Up @@ -724,7 +753,12 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
}

private def addExecutors(manager: ExecutorAllocationManager): Int = {
manager invokePrivate _addExecutors()
val maxNumExecutorsNeeded = manager invokePrivate _maxNumExecutorsNeeded()
manager invokePrivate _addExecutors(maxNumExecutorsNeeded)
}

private def adjustRequestedExecutors(manager: ExecutorAllocationManager): Int = {
manager invokePrivate _addOrCancelExecutorRequests(0L)
}

private def removeExecutor(manager: ExecutorAllocationManager, id: String): Boolean = {
Expand Down

0 comments on commit e53da21

Please sign in to comment.