Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-979] a LRU scheduler for load balancing in TaskSchedulerImpl #7

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import java.nio.ByteBuffer
import java.util.{TimerTask, Timer}
import java.util.concurrent.atomic.AtomicLong

import scala.concurrent.duration._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.collection.mutable.{HashMap, HashSet, Map}
import scala.concurrent.duration._


import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.collection.LRUMap

/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
Expand Down Expand Up @@ -195,11 +196,13 @@ private[spark] class TaskSchedulerImpl(
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
* that tasks are balanced across the cluster.
*/
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
def resourceOffers(execWorkerOffers: Map[String, WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
SparkEnv.set(sc.env)

val workerOffers = execWorkerOffers.values

// Mark each slave as alive and remember its hostname
for (o <- offers) {
for (o <- workerOffers) {
executorIdToHost(o.executorId) = o.host
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
Expand All @@ -208,8 +211,8 @@ private[spark] class TaskSchedulerImpl(
}

// Build a list of tasks to assign to each worker
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = offers.map(o => o.cores).toArray
val tasks = workerOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)).toArray
val availableCpus = workerOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue()
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
Expand All @@ -222,16 +225,20 @@ private[spark] class TaskSchedulerImpl(
for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
do {
launchedTask = false
for (i <- 0 until offers.size) {
val execId = offers(i).executorId
val host = offers(i).host
for (i <- 0 until workerOffers.size) {
//will not trigger LRU moving
val candidateOffer = execWorkerOffers.head._2
val execId = candidateOffer.executorId
val host = candidateOffer.host
for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
tasks(i) += task
val tid = task.taskId
//trigger LRU
val selectedExecId = execWorkerOffers.get(execWorkerOffers.head._1).get.executorId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
taskIdToExecutorId(tid) = selectedExecId
activeExecutorIds += selectedExecId
executorsByHost(host) += selectedExecId
availableCpus(i) -= 1
launchedTask = true
}
Expand Down Expand Up @@ -458,4 +465,18 @@ private[spark] object TaskSchedulerImpl {

retval.toList
}


def buildMapFromWorkerOffers(scheduler: TaskSchedulerImpl, workerOffers: Seq[WorkerOffer]) = {
val map = {
if (scheduler.conf.getBoolean("spark.scheduler.lruOffer", false)) {
new LRUMap[String, WorkerOffer](1000)
}
else {
new HashMap[String, WorkerOffer]
}
}
for (workerOffer <- workerOffers) map += workerOffer.executorId -> workerOffer
map
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ package org.apache.spark.scheduler
* Represents free resources available on an executor.
*/
private[spark]
class WorkerOffer(val executorId: String, val host: String, val cores: Int)
class WorkerOffer(val executorId: String, val host: String, var cores: Int)
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.{Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.collection.LRUMap

/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
Expand All @@ -50,12 +51,23 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
private val timeout = AkkaUtils.askTimeout(conf)

class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {

private val schedulingMode = conf.getBoolean("spark.scheduler.lruOffer", false)

private val executorActor = new HashMap[String, ActorRef]
private val executorAddress = new HashMap[String, Address]
private val executorHost = new HashMap[String, String]
private val freeCores = new HashMap[String, Int]
private val executorWorkerOffer = {
if (schedulingMode) {
new LRUMap[String, WorkerOffer](1000)
}
else {
new HashMap[String, WorkerOffer]
}
}
private val addressToExecutorId = new HashMap[Address, String]


override def preStart() {
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
Expand All @@ -76,7 +88,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
sender ! RegisteredExecutor(sparkProperties)
executorActor(executorId) = sender
executorHost(executorId) = Utils.parseHostPort(hostPort)._1
freeCores(executorId) = cores
executorWorkerOffer(executorId) = new WorkerOffer(executorId, executorHost(executorId), cores)
executorAddress(executorId) = sender.path.address
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
Expand All @@ -87,7 +99,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
if (executorActor.contains(executorId)) {
freeCores(executorId) += 1
executorWorkerOffer(executorId).cores += 1
makeOffers(executorId)
} else {
// Ignoring the update since we don't know about the executor.
Expand Down Expand Up @@ -124,34 +136,33 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
}

// Make fake resource offers on all executors
def makeOffers() {
launchTasks(scheduler.resourceOffers(
executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
private[this] def makeOffers() {
launchTasks(scheduler.resourceOffers(executorWorkerOffer))
}

// Make fake resource offers on just one executor
def makeOffers(executorId: String) {
private[this] def makeOffers(executorId: String) {
launchTasks(scheduler.resourceOffers(
Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
TaskSchedulerImpl.buildMapFromWorkerOffers(scheduler, Seq(executorWorkerOffer(executorId)))))
}

// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
private[this] def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
freeCores(task.executorId) -= 1
executorWorkerOffer(task.executorId).cores -= 1
executorActor(task.executorId) ! LaunchTask(task)
}
}

// Remove a disconnected slave from the cluster
def removeExecutor(executorId: String, reason: String) {
private[this] def removeExecutor(executorId: String, reason: String) {
if (executorActor.contains(executorId)) {
logInfo("Executor " + executorId + " disconnected, so removing it")
val numCores = freeCores(executorId)
val numCores = executorWorkerOffer(executorId).cores
addressToExecutorId -= executorAddress(executorId)
executorActor -= executorId
executorHost -= executorId
freeCores -= executorId
executorWorkerOffer -= executorId
totalCoreCount.addAndGet(-numCores)
scheduler.executorLost(executorId, SlaveLost(reason))
}
Expand All @@ -173,7 +184,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}

def stopExecutors() {
private[this] def stopExecutors() {
try {
if (driverActor != null) {
logInfo("Shutting down all executors")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,9 @@ private[spark] class MesosSchedulerBackend(
offer.getHostname,
getResource(offer.getResourcesList, "cpus").toInt)
}

// Call into the TaskSchedulerImpl
val taskLists = scheduler.resourceOffers(offerableWorkers)

// Call into the ClusterScheduler
val taskLists = scheduler.resourceOffers(
TaskSchedulerImpl.buildMapFromWorkerOffers(scheduler, offerableWorkers))
// Build a list of Mesos tasks for each slave
val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
for ((taskList, index) <- taskLists.zipWithIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ private[spark] class LocalActor(

def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
for (task <- scheduler.resourceOffers(offers).flatten) {
for (task <- scheduler.resourceOffers(
TaskSchedulerImpl.buildMapFromWorkerOffers(scheduler, offers)).flatten) {
freeCores -= 1
executor.launchTask(executorBackend, task.taskId, task.serializedTask)
}
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/util/collection/LRUMap.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.spark.util.collection

import scala.collection.mutable
import java.util
import scala.collection.convert.Wrappers
import org.apache.commons.collections.map.{LRUMap => JLRUMap}

class LRUMap[K, V](maxSize: Int, underlying: util.Map[K, V]) extends Wrappers.JMapWrapper(underlying) {

def this(maxSize: Int) = this(maxSize, new JLRUMap(maxSize).asInstanceOf[util.Map[K, V]])
}


class LRUSyncMap[K, V](maxSize: Int, underlying: util.Map[K, V])
extends LRUMap[K, V](maxSize, underlying: util.Map[K, V]) with mutable.SynchronizedMap[K, V]
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.scalatest.FunSuite
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.FakeClock
import org.apache.spark.util.collection.LRUMap

class FakeDAGScheduler(taskScheduler: FakeTaskScheduler) extends DAGScheduler(taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
Expand Down Expand Up @@ -298,6 +299,32 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
}
}

test ("executors should be scheduled with LRU order when the feature is enabled") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc,
("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
sched.rootPool = new Pool("", sched.schedulingMode, 0, 0)
sched.schedulableBuilder = new FIFOSchedulableBuilder(sched.rootPool)
val executorWorkerOffer = new LRUMap[String, WorkerOffer](3)
executorWorkerOffer += "exec1" -> new WorkerOffer("exec1", "host1", 8)
executorWorkerOffer += "exec2" -> new WorkerOffer("exec2", "host2", 8)
executorWorkerOffer += "exec3" -> new WorkerOffer("exec3", "host3", 8)
val taskSet1 = createTaskSet(1)
val taskSet2 = createTaskSet(1)
val taskSet3 = createTaskSet(1)
val clock = new FakeClock
val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES, clock)
val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, clock)
val manager3 = new TaskSetManager(sched, taskSet3, MAX_TASK_FAILURES, clock)
sched.schedulableBuilder.addTaskSetManager(manager1, manager1.taskSet.properties)
sched.schedulableBuilder.addTaskSetManager(manager2, manager2.taskSet.properties)
sched.schedulableBuilder.addTaskSetManager(manager3, manager3.taskSet.properties)
sched.resourceOffers(executorWorkerOffer)
assert(sched.taskIdToExecutorId.values.toList(0) == "exec3")
assert(sched.taskIdToExecutorId.values.toList(1) == "exec2")
assert(sched.taskIdToExecutorId.values.toList(2) == "exec1")
}


/**
* Utility method to create a TaskSet, potentially setting a particular sequence of preferred
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,14 @@ Apart from these, the following properties are also available, and may be useful
to use fair sharing instead of queueing jobs one after another. Useful for
multi-user services.
</td>
</tr>
<tr>
<td>spark.scheduler.lruOffer</td>
<td>false</td>
<td>
offer resources to the application in LRU order. If set to false, the tasks may be padded into just a few
nodes, if true, then the executors will be scheduled in LRU order
</td>
</tr>
<tr>
<td>spark.reducer.maxMbInFlight</td>
Expand Down