Skip to content

Commit

Permalink
add comment and minor fix
Browse files Browse the repository at this point in the history
  • Loading branch information
WangTaoTheTonic committed Sep 9, 2014
1 parent 2835929 commit f674e59
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -482,22 +482,26 @@ private[spark] class Master(

// First schedule drivers, they take strict precedence over applications
// Randomization helps balance drivers
val shuffledAliveWorkers = Random.shuffle(workers).toArray
val aliveWorkerNum = shuffledAliveWorkers.size
var curPos = aliveWorkerNum - 1
for (driver <- List(waitingDrivers: _*)) {
val startFlag = curPos
curPos = (curPos + 1) % aliveWorkerNum
val shuffledWorkers = Random.shuffle(workers).toArray
val workerNum = shuffledWorkers.size
var curPos = 0
for (driver <- List(waitingDrivers: _*)) { // iterate over a copy of waitingDrivers
// For each waiting driver we pick a worker that has enough resources to launch it.
// The picking does in a round-robin fashion, starting from position behind last
// worker on which driver was just launched and ending with driver being launched
// or we have iterated over all workers.
val startPos = curPos
curPos = (curPos + 1) % workerNum
var launched = false
while (curPos != startFlag && !launched) {
val worker = shuffledAliveWorkers(curPos)
while (curPos - 1 != startPos && !launched) {
val worker = shuffledWorkers(curPos)
if (worker.state == WorkerState.ALIVE
&& worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % aliveWorkerNum
curPos = (curPos + 1) % workerNum
}
}

Expand Down

0 comments on commit f674e59

Please sign in to comment.