Skip to content
This repository has been archived by the owner on Nov 30, 2019. It is now read-only.

Commit

Permalink
address andrewor14's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
lianhuiwang committed Jan 14, 2015
1 parent 1b029a4 commit bbc4d5a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ private[spark] abstract class YarnSchedulerBackend(
/**
* An actor that communicates with the ApplicationMaster.
*/
protected class YarnSchedulerActor(isDriver: Boolean) extends Actor {
protected class YarnSchedulerActor(isClusterMode: Boolean) extends Actor {
private var amActor: Option[ActorRef] = None

override def preStart(): Unit = {
// Listen for disassociation events
if (!isDriver) {
if (!isClusterMode) {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,18 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
reporterThread = launchReporterThread()
}

private def runAMActor(securityMgr: SecurityManager, host: String, port: String,
isDriver: Boolean): Unit = {
actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf, securityManager = securityMgr)._1
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
host,
port,
YarnSchedulerBackend.ACTOR_NAME)
actor = actorSystem.actorOf(Props(new AMActor(driverUrl, false)), name = "YarnAM")
}

private def runDriver(securityMgr: SecurityManager): Unit = {
addAmIpFilter()
userClassThread = startUserClass()
Expand All @@ -245,23 +257,15 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
} else {
actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf, securityManager = securityMgr)._1
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
YarnSchedulerBackend.ACTOR_NAME)
actor = actorSystem.actorOf(Props(new AMActor(driverUrl, true)), name = "YarnAM")
runAMActor(securityMgr, sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"), true)
registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
userClassThread.join()
}
}

private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf, securityManager = securityMgr)._1
actor = waitForSparkDriver()
waitForSparkDriver(securityMgr)
addAmIpFilter()
registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)

Expand Down Expand Up @@ -375,7 +379,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
}
}

private def waitForSparkDriver(): ActorRef = {
private def waitForSparkDriver(securityMgr: SecurityManager): ActorRef = {
logInfo("Waiting for Spark driver to be reachable.")
var driverUp = false
val hostport = args.userArgs(0)
Expand Down Expand Up @@ -407,12 +411,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)

val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
driverHost,
driverPort.toString,
YarnSchedulerBackend.ACTOR_NAME)
actorSystem.actorOf(Props(new AMActor(driverUrl, false)), name = "YarnAM")
runAMActor(securityMgr, driverHost, driverPort.toString, false)
}

/** Add the Yarn IP filter that is required for properly securing the UI. */
Expand Down

0 comments on commit bbc4d5a

Please sign in to comment.