Skip to content

Commit

Permalink
in yarn-cluster mode,executor number can be added or killed.
Browse files Browse the repository at this point in the history
  • Loading branch information
lianhuiwang committed Jan 9, 2015
1 parent 167a5ab commit 6dfeeec
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ private[spark] abstract class YarnSchedulerBackend(

protected var totalExpectedExecutors = 0

private val yarnSchedulerActor: ActorRef =
actorSystem.actorOf(
Props(new YarnSchedulerActor),
name = YarnSchedulerBackend.ACTOR_NAME)
protected val yarnSchedulerActor: ActorRef = _

private implicit val askTimeout = AkkaUtils.askTimeout(sc.conf)

Expand Down Expand Up @@ -94,12 +91,14 @@ private[spark] abstract class YarnSchedulerBackend(
/**
* An actor that communicates with the ApplicationMaster.
*/
private class YarnSchedulerActor extends Actor {
protected class YarnSchedulerActor(isDriver: Boolean) extends Actor {
private var amActor: Option[ActorRef] = None

override def preStart(): Unit = {
// Listen for disassociation events
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
if (!isDriver) {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
}

override def receive = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
} else {
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
YarnSchedulerBackend.ACTOR_NAME)
actor = sc.env.actorSystem.actorOf(Props(new AMActor(driverUrl, true)), name = "YarnAM")
registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
userClassThread.join()
}
Expand Down Expand Up @@ -404,7 +410,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
driverHost,
driverPort.toString,
YarnSchedulerBackend.ACTOR_NAME)
actorSystem.actorOf(Props(new AMActor(driverUrl)), name = "YarnAM")
actorSystem.actorOf(Props(new AMActor(driverUrl, false)), name = "YarnAM")
}

/** Add the Yarn IP filter that is required for properly securing the UI. */
Expand Down Expand Up @@ -464,7 +470,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
/**
* Actor that communicates with the driver in client deploy mode.
*/
private class AMActor(driverUrl: String) extends Actor {
private class AMActor(driverUrl: String, isDriver: Boolean) extends Actor {
var driver: ActorSelection = _

override def preStart() = {
Expand All @@ -474,7 +480,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// we can monitor Lifecycle Events.
driver ! "Hello"
driver ! RegisterClusterManager
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
if (isDriver) {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
}

override def receive = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.scheduler.cluster

import scala.collection.mutable.ArrayBuffer

import akka.actor.Props

import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}

import org.apache.spark.{SparkException, Logging, SparkContext}
Expand All @@ -41,6 +43,8 @@ private[spark] class YarnClientSchedulerBackend(
*/
override def start() {
super.start()
yarnSchedulerActor = sc.env.actorSystem.actorOf(Props(new YarnSchedulerActor(false)),
name = YarnSchedulerBackend.ACTOR_NAME)
val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler.cluster

import akka.actor.Props

import org.apache.spark.SparkContext
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.scheduler.TaskSchedulerImpl
Expand All @@ -29,6 +31,8 @@ private[spark] class YarnClusterSchedulerBackend(

override def start() {
super.start()
yarnSchedulerActor = sc.env.actorSystem.actorOf(Props(new YarnSchedulerActor(true)),
name = YarnSchedulerBackend.ACTOR_NAME)
totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS
if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
Expand Down

0 comments on commit 6dfeeec

Please sign in to comment.