From 69c348cbeb2c2cf6bb274ea6b60ec9c0628b46a5 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Wed, 30 Apr 2014 11:27:49 -0700 Subject: [PATCH] Cancel retryTimer on restart of Worker or AppClient --- .../org/apache/spark/deploy/client/AppClient.scala | 13 +++++++++---- .../org/apache/spark/deploy/worker/Worker.scala | 10 ++++++---- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 8901806de9262..1b58c4d41bce7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -60,6 +60,7 @@ private[spark] class AppClient( var master: ActorSelection = null var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times var alreadyDead = false // To avoid calling listener.dead() multiple times + var retryTimer: Option[Cancellable] = None override def preStart() { context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) @@ -83,14 +84,13 @@ private[spark] class AppClient( def registerWithMaster() { tryRegisterAllMasters() - import context.dispatcher var retries = 0 - lazy val retryTimer: Cancellable = + retryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { retries += 1 if (registered) { - retryTimer.cancel() + retryTimer.foreach(_.cancel()) } else if (retries >= REGISTRATION_RETRIES) { logError("All masters are unresponsive! Giving up.") markDead() @@ -98,7 +98,7 @@ private[spark] class AppClient( tryRegisterAllMasters() } } - retryTimer // start timer + } } def changeMaster(url: String) { @@ -179,6 +179,11 @@ private[spark] class AppClient( alreadyDead = true } } + + override def postStop() { + retryTimer.foreach(_.cancel()) + } + } def start() { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index dd0a1360abe14..2442e3608b282 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -102,6 +102,8 @@ private[spark] class Worker( val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr) val workerSource = new WorkerSource(this) + var retryTimer: Option[Cancellable] = None + def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed @@ -163,13 +165,12 @@ private[spark] class Worker( def registerWithMaster() { tryRegisterAllMasters() - var retries = 0 - lazy val retryTimer: Cancellable = + retryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { retries += 1 if (registered) { - retryTimer.cancel() + retryTimer.foreach(_.cancel()) } else if (retries >= REGISTRATION_RETRIES) { logError("All masters are unresponsive! Giving up.") System.exit(1) @@ -177,7 +178,7 @@ private[spark] class Worker( tryRegisterAllMasters() } } - retryTimer // start timer + } } override def receive = { @@ -346,6 +347,7 @@ private[spark] class Worker( } override def postStop() { + retryTimer.foreach(_.cancel()) executors.values.foreach(_.kill()) drivers.values.foreach(_.kill()) webUi.stop()