Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into df-corr
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed May 2, 2015
2 parents 4fe693b + ecc6eb5 commit 4b74b24
Show file tree
Hide file tree
Showing 90 changed files with 3,250 additions and 513 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ TAGS
RELEASE
control
docs
docker.properties.template
fairscheduler.xml.template
spark-defaults.conf.template
log4j.properties
Expand Down
3 changes: 3 additions & 0 deletions conf/docker.properties.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
spark.mesos.executor.docker.image: <image built from `../docker/spark-mesos/Dockerfile`>
spark.mesos.executor.docker.volumes: /usr/local/lib:/host/usr/local/lib:ro
spark.mesos.executor.home: /opt/spark
101 changes: 49 additions & 52 deletions core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,20 @@ import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
/**
* An agent that dynamically allocates and removes executors based on the workload.
*
* The add policy depends on whether there are backlogged tasks waiting to be scheduled. If
* the scheduler queue is not drained in N seconds, then new executors are added. If the queue
* persists for another M seconds, then more executors are added and so on. The number added
* in each round increases exponentially from the previous round until an upper bound on the
* number of executors has been reached. The upper bound is based both on a configured property
* and on the number of tasks pending: the policy will never increase the number of executor
* requests past the number needed to handle all pending tasks.
* The ExecutorAllocationManager maintains a moving target number of executors which is periodically
* synced to the cluster manager. The target starts at a configured initial value and changes with
* the number of pending and running tasks.
*
* Decreasing the target number of executors happens when the current target is more than needed to
* handle the current load. The target number of executors is always truncated to the number of
* executors that could run all current running and pending tasks at once.
*
* Increasing the target number of executors happens in response to backlogged tasks waiting to be
* scheduled. If the scheduler queue is not drained in N seconds, then new executors are added. If
* the queue persists for another M seconds, then more executors are added and so on. The number
* added in each round increases exponentially from the previous round until an upper bound has been
* reached. The upper bound is based both on a configured property and on the current number of
* running and pending tasks, as described above.
*
* The rationale for the exponential increase is twofold: (1) Executors should be added slowly
* in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
Expand Down Expand Up @@ -105,8 +112,10 @@ private[spark] class ExecutorAllocationManager(
// Number of executors to add in the next round
private var numExecutorsToAdd = 1

// Number of executors that have been requested but have not registered yet
private var numExecutorsPending = 0
// The desired number of executors at this moment in time. If all our executors were to die, this
// is the number of executors we would immediately want from the cluster manager.
private var numExecutorsTarget =
conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)

// Executors that have been requested to be removed but have not been killed yet
private val executorsPendingToRemove = new mutable.HashSet[String]
Expand Down Expand Up @@ -199,13 +208,6 @@ private[spark] class ExecutorAllocationManager(
executor.awaitTermination(10, TimeUnit.SECONDS)
}

/**
* The number of executors we would have if the cluster manager were to fulfill all our existing
* requests.
*/
private def targetNumExecutors(): Int =
numExecutorsPending + executorIds.size - executorsPendingToRemove.size

/**
* The maximum number of executors we would need under the current load to satisfy all running
* and pending tasks, rounded up.
Expand All @@ -227,7 +229,7 @@ private[spark] class ExecutorAllocationManager(
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis

addOrCancelExecutorRequests(now)
updateAndSyncNumExecutorsTarget(now)

removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
Expand All @@ -239,26 +241,28 @@ private[spark] class ExecutorAllocationManager(
}

/**
* Updates our target number of executors and syncs the result with the cluster manager.
*
* Check to see whether our existing allocation and the requests we've made previously exceed our
* current needs. If so, let the cluster manager know so that it can cancel pending requests that
* are unneeded.
* current needs. If so, truncate our target and let the cluster manager know so that it can
* cancel pending requests that are unneeded.
*
* If not, and the add time has expired, see if we can request new executors and refresh the add
* time.
*
* @return the delta in the target number of executors.
*/
private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
val currentTarget = targetNumExecutors
private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
val maxNeeded = maxNumExecutorsNeeded

if (maxNeeded < currentTarget) {
if (maxNeeded < numExecutorsTarget) {
// The target number exceeds the number we actually need, so stop adding new
// executors and inform the cluster manager to cancel the extra pending requests.
val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
client.requestTotalExecutors(newTotalExecutors)
// executors and inform the cluster manager to cancel the extra pending requests
val oldNumExecutorsTarget = numExecutorsTarget
numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
client.requestTotalExecutors(numExecutorsTarget)
numExecutorsToAdd = 1
updateNumExecutorsPending(newTotalExecutors)
numExecutorsTarget - oldNumExecutorsTarget
} else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " +
Expand All @@ -281,21 +285,30 @@ private[spark] class ExecutorAllocationManager(
*/
private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
// Do not request more executors if it would put our target over the upper bound
val currentTarget = targetNumExecutors
if (currentTarget >= maxNumExecutors) {
logDebug(s"Not adding executors because there are already ${executorIds.size} " +
s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)")
if (numExecutorsTarget >= maxNumExecutors) {
val numExecutorsPending = numExecutorsTarget - executorIds.size
logDebug(s"Not adding executors because there are already ${executorIds.size} registered " +
s"and ${numExecutorsPending} pending executor(s) (limit $maxNumExecutors)")
numExecutorsToAdd = 1
return 0
}

val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded)
val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors)
val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors)
val oldNumExecutorsTarget = numExecutorsTarget
// There's no point in wasting time ramping up to the number of executors we already have, so
// make sure our target is at least as much as our current allocation:
numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)
// Boost our target with the number to add for this round:
numExecutorsTarget += numExecutorsToAdd
// Ensure that our target doesn't exceed what we need at the present moment:
numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
// Ensure that our target fits within configured bounds:
numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)

val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget)
if (addRequestAcknowledged) {
val delta = updateNumExecutorsPending(newTotalExecutors)
val delta = numExecutorsTarget - oldNumExecutorsTarget
logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" +
s" (new desired total will be $newTotalExecutors)")
s" (new desired total will be $numExecutorsTarget)")
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
numExecutorsToAdd * 2
} else {
Expand All @@ -304,23 +317,11 @@ private[spark] class ExecutorAllocationManager(
delta
} else {
logWarning(
s"Unable to reach the cluster manager to request $newTotalExecutors total executors!")
s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!")
0
}
}

/**
* Given the new target number of executors, update the number of pending executor requests,
* and return the delta from the old number of pending requests.
*/
private def updateNumExecutorsPending(newTotalExecutors: Int): Int = {
val newNumExecutorsPending =
newTotalExecutors - executorIds.size + executorsPendingToRemove.size
val delta = newNumExecutorsPending - numExecutorsPending
numExecutorsPending = newNumExecutorsPending
delta
}

/**
* Request the cluster manager to remove the given executor.
* Return whether the request is received.
Expand Down Expand Up @@ -372,10 +373,6 @@ private[spark] class ExecutorAllocationManager(
// as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951)
executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})")
if (numExecutorsPending > 0) {
numExecutorsPending -= 1
logDebug(s"Decremented number of pending executors ($numExecutorsPending left)")
}
} else {
logWarning(s"Duplicate executor $executorId has registered")
}
Expand Down
17 changes: 15 additions & 2 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,13 @@ import org.apache.spark.util.Utils
* authorization. If not filter is in place the user is generally null and no authorization
* can take place.
*
* Connection encryption (SSL) configuration is organized hierarchically. The user can configure
* the default SSL settings which will be used for all the supported communication protocols unless
* When authentication is being used, encryption can also be enabled by setting the option
* spark.authenticate.enableSaslEncryption to true. This is only supported by communication
* channels that use the network-common library, and can be used as an alternative to SSL in those
* cases.
*
* SSL can be used for encryption for certain communication channels. The user can configure the
* default SSL settings which will be used for all the supported communication protocols unless
* they are overwritten by protocol specific settings. This way the user can easily provide the
* common settings for all the protocols without disabling the ability to configure each one
* individually.
Expand Down Expand Up @@ -412,6 +417,14 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
*/
def isAuthenticationEnabled(): Boolean = authOn

/**
* Checks whether SASL encryption should be enabled.
* @return Whether to enable SASL encryption when connecting to services that support it.
*/
def isSaslEncryptionEnabled(): Boolean = {
sparkConf.getBoolean("spark.authenticate.enableSaslEncryption", false)
}

/**
* Gets the user used for authenticating HTTP connections.
* For now use a single hardcoded user.
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,19 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
value
}

/** Control our logLevel. This overrides any user-defined log settings.
* @param logLevel The desired log level as a string.
* Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
*/
def setLogLevel(logLevel: String) {
val validLevels = Seq("ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN")
if (!validLevels.contains(logLevel)) {
throw new IllegalArgumentException(
s"Supplied level $logLevel did not match one of: ${validLevels.mkString(",")}")
}
Utils.setLogLevel(org.apache.log4j.Level.toLevel(logLevel))
}

try {
_conf = config.clone()
_conf.validateSettings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,14 @@ class JavaSparkContext(val sc: SparkContext)
*/
def getLocalProperty(key: String): String = sc.getLocalProperty(key)

/** Control our logLevel. This overrides any user-defined log settings.
* @param logLevel The desired log level as a string.
* Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
*/
def setLogLevel(logLevel: String) {
sc.setLogLevel(logLevel)
}

/**
* Assigns a group ID to all the jobs started by this thread until the group ID is set to a
* different value or cleared.
Expand Down
73 changes: 55 additions & 18 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.deploy

import scala.collection.mutable.HashSet
import scala.concurrent._

import akka.actor._
Expand All @@ -31,21 +32,24 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, Utils}

/**
* Proxy that relays messages to the driver.
*
* We currently don't support retry if submission fails. In HA mode, client will submit request to
* all masters and see which one could handle it.
*/
private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
extends Actor with ActorLogReceive with Logging {

var masterActor: ActorSelection = _
private val masterActors = driverArgs.masters.map { m =>
context.actorSelection(Master.toAkkaUrl(m, AkkaUtils.protocol(context.system)))
}
private val lostMasters = new HashSet[Address]
private var activeMasterActor: ActorSelection = null

val timeout = RpcUtils.askTimeout(conf)

override def preStart(): Unit = {
masterActor = context.actorSelection(
Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(context.system)))

context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")

driverArgs.cmd match {
case "launch" =>
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
Expand Down Expand Up @@ -79,11 +83,17 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
driverArgs.supervise,
command)

masterActor ! RequestSubmitDriver(driverDescription)
// This assumes only one Master is active at a time
for (masterActor <- masterActors) {
masterActor ! RequestSubmitDriver(driverDescription)
}

case "kill" =>
val driverId = driverArgs.driverId
masterActor ! RequestKillDriver(driverId)
// This assumes only one Master is active at a time
for (masterActor <- masterActors) {
masterActor ! RequestKillDriver(driverId)
}
}
}

Expand All @@ -92,10 +102,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
println("... waiting before polling master for driver state")
Thread.sleep(5000)
println("... polling master for driver state")
val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)
val statusFuture = (activeMasterActor ? RequestDriverStatus(driverId))(timeout)
.mapTo[DriverStatusResponse]
val statusResponse = Await.result(statusFuture, timeout)

statusResponse.found match {
case false =>
println(s"ERROR: Cluster master did not recognize $driverId")
Expand All @@ -122,20 +131,46 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)

case SubmitDriverResponse(success, driverId, message) =>
println(message)
if (success) pollAndReportStatus(driverId.get) else System.exit(-1)
if (success) {
activeMasterActor = context.actorSelection(sender.path)
pollAndReportStatus(driverId.get)
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}


case KillDriverResponse(driverId, success, message) =>
println(message)
if (success) pollAndReportStatus(driverId) else System.exit(-1)
if (success) {
activeMasterActor = context.actorSelection(sender.path)
pollAndReportStatus(driverId)
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}

case DisassociatedEvent(_, remoteAddress, _) =>
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
System.exit(-1)
if (!lostMasters.contains(remoteAddress)) {
println(s"Error connecting to master $remoteAddress.")
lostMasters += remoteAddress
// Note that this heuristic does not account for the fact that a Master can recover within
// the lifetime of this client. Thus, once a Master is lost it is lost to us forever. This
// is not currently a concern, however, because this client does not retry submissions.
if (lostMasters.size >= masterActors.size) {
println("No master is available, exiting.")
System.exit(-1)
}
}

case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
println(s"Cause was: $cause")
System.exit(-1)
if (!lostMasters.contains(remoteAddress)) {
println(s"Error connecting to master ($remoteAddress).")
println(s"Cause was: $cause")
lostMasters += remoteAddress
if (lostMasters.size >= masterActors.size) {
println("No master is available, exiting.")
System.exit(-1)
}
}
}
}

Expand Down Expand Up @@ -163,7 +198,9 @@ object Client {
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

// Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(actorSystem))
for (m <- driverArgs.masters) {
Master.toAkkaUrl(m, AkkaUtils.protocol(actorSystem))
}
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))

actorSystem.awaitTermination()
Expand Down
Loading

0 comments on commit 4b74b24

Please sign in to comment.