Skip to content

Commit

Permalink
[SPARK-13803] restore the changes in SPARK-3411
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This patch contains the functionality to balance the load of the cluster-mode drivers among workers

This patch restores the changes in apache#1106 which was erased due to the merging of apache#731

## How was this patch tested?

test with existing test cases

Author: CodingCat <zhunansjtu@gmail.com>

Closes apache#11702 from CodingCat/SPARK-13803.
  • Loading branch information
CodingCat authored and roygao94 committed Mar 22, 2016
1 parent 74e8e7b commit b1e196b
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -727,15 +727,28 @@ private[deploy] class Master(
* every time a new app joins or resource availability changes.
*/
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) { return }
if (state != RecoveryState.ALIVE) {
return
}
// Drivers take strict precedence over executors
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
Expand Down

0 comments on commit b1e196b

Please sign in to comment.