diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index b145b36b11b69..138c3dd1da2e5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -41,7 +41,10 @@ private[spark] abstract class YarnSchedulerBackend( protected var totalExpectedExecutors = 0 - protected var yarnSchedulerActor: ActorRef = _ + private val yarnSchedulerActor: ActorRef = + actorSystem.actorOf( + Props(new YarnSchedulerActor), + name = YarnSchedulerBackend.ACTOR_NAME) private implicit val askTimeout = AkkaUtils.askTimeout(sc.conf) @@ -91,14 +94,12 @@ private[spark] abstract class YarnSchedulerBackend( /** * An actor that communicates with the ApplicationMaster. */ - protected class YarnSchedulerActor(isClusterMode: Boolean) extends Actor { + protected class YarnSchedulerActor extends Actor { private var amActor: Option[ActorRef] = None override def preStart(): Unit = { // Listen for disassociation events - if (!isClusterMode) { - context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) - } + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } override def receive = { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index e1f12b32182a2..fc307cee4521f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -239,12 +239,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, * so AMActor don't monitor lifecycle of driver. */ private def runAMActor( - securityMgr: SecurityManager, host: String, port: String, isDriver: Boolean): Unit = { - actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, - conf = sparkConf, securityManager = securityMgr)._1 val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( SparkEnv.driverActorSystemName, host, @@ -267,8 +264,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ApplicationMaster.EXIT_SC_NOT_INITED, "Timed out waiting for SparkContext.") } else { - runAMActor(securityMgr, sc.getConf.get("spark.driver.host"), - sc.getConf.get("spark.driver.port"), true) + actorSystem = sc.env.actorSystem + runAMActor(sc.getConf.get("spark.driver.host"), sc.getConf.get("spark.driver.port"), true) registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr) userClassThread.join() } @@ -421,7 +418,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, sparkConf.set("spark.driver.host", driverHost) sparkConf.set("spark.driver.port", driverPort.toString) - runAMActor(securityMgr, driverHost, driverPort.toString, false) + actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, + conf = sparkConf, securityManager = securityMgr)._1 + runAMActor(driverHost, driverPort.toString, false) } /** Add the Yarn IP filter that is required for properly securing the UI. */ @@ -491,7 +490,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, // we can monitor Lifecycle Events. driver ! "Hello" driver ! RegisterClusterManager - if (isDriver) { + if (!isDriver) { context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } } diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index c2b7fc3deff6b..77d5e40d8ecfa 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -43,8 +43,6 @@ private[spark] class YarnClientSchedulerBackend( */ override def start() { super.start() - yarnSchedulerActor = sc.env.actorSystem.actorOf(Props(new YarnSchedulerActor(false)), - name = YarnSchedulerBackend.ACTOR_NAME) val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index c90fe88d8b86c..706c441668b42 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -31,8 +31,6 @@ private[spark] class YarnClusterSchedulerBackend( override def start() { super.start() - yarnSchedulerActor = sc.env.actorSystem.actorOf(Props(new YarnSchedulerActor(true)), - name = YarnSchedulerBackend.ACTOR_NAME) totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))