diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 45f71d9927673..2758c26fa7ec6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -482,22 +482,26 @@ private[spark] class Master( // First schedule drivers, they take strict precedence over applications // Randomization helps balance drivers - val shuffledAliveWorkers = Random.shuffle(workers).toArray - val aliveWorkerNum = shuffledAliveWorkers.size - var curPos = aliveWorkerNum - 1 - for (driver <- List(waitingDrivers: _*)) { - val startFlag = curPos - curPos = (curPos + 1) % aliveWorkerNum + val shuffledWorkers = Random.shuffle(workers).toArray + val workerNum = shuffledWorkers.size + var curPos = 0 + for (driver <- List(waitingDrivers: _*)) { // iterate over a copy of waitingDrivers + // For each waiting driver we pick a worker that has enough resources to launch it. + // The picking does in a round-robin fashion, starting from position behind last + // worker on which driver was just launched and ending with driver being launched + // or we have iterated over all workers. + val startPos = curPos + curPos = (curPos + 1) % workerNum var launched = false - while (curPos != startFlag && !launched) { - val worker = shuffledAliveWorkers(curPos) + while (curPos - 1 != startPos && !launched) { + val worker = shuffledWorkers(curPos) if (worker.state == WorkerState.ALIVE && worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver launched = true } - curPos = (curPos + 1) % aliveWorkerNum + curPos = (curPos + 1) % workerNum } }