From b1e196b575572c85643739f7f532d3eac03e6838 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 15 Mar 2016 10:10:23 +0000 Subject: [PATCH] [SPARK-13803] restore the changes in SPARK-3411 ## What changes were proposed in this pull request? This patch contains the functionality to balance the load of the cluster-mode drivers among workers This patch restores the changes in https://github.com/apache/spark/pull/1106 which was erased due to the merging of https://github.com/apache/spark/pull/731 ## How was this patch tested? test with existing test cases Author: CodingCat Closes #11702 from CodingCat/SPARK-13803. --- .../apache/spark/deploy/master/Master.scala | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 6b9b1408ee44e..c97ad4d72350a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -727,15 +727,28 @@ private[deploy] class Master( * every time a new app joins or resource availability changes. */ private def schedule(): Unit = { - if (state != RecoveryState.ALIVE) { return } + if (state != RecoveryState.ALIVE) { + return + } // Drivers take strict precedence over executors - val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers - for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { - for (driver <- waitingDrivers) { + val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) + val numWorkersAlive = shuffledAliveWorkers.size + var curPos = 0 + for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers + // We assign workers to each waiting driver in a round-robin fashion. For each driver, we + // start from the last worker that was assigned a driver, and continue onwards until we have + // explored all alive workers. + var launched = false + var numWorkersVisited = 0 + while (numWorkersVisited < numWorkersAlive && !launched) { + val worker = shuffledAliveWorkers(curPos) + numWorkersVisited += 1 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver + launched = true } + curPos = (curPos + 1) % numWorkersAlive } } startExecutorsOnWorkers()