Skip to content

Commit

Permalink
Cancel retryTimer on restart of Worker or AppClient
Browse files Browse the repository at this point in the history
  • Loading branch information
markhamstra committed Apr 30, 2014
1 parent 55100da commit 69c348c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ private[spark] class AppClient(
var master: ActorSelection = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
var alreadyDead = false // To avoid calling listener.dead() multiple times
var retryTimer: Option[Cancellable] = None

override def preStart() {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
Expand All @@ -83,22 +84,21 @@ private[spark] class AppClient(

def registerWithMaster() {
tryRegisterAllMasters()

import context.dispatcher
var retries = 0
lazy val retryTimer: Cancellable =
retryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
retries += 1
if (registered) {
retryTimer.cancel()
retryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
logError("All masters are unresponsive! Giving up.")
markDead()
} else {
tryRegisterAllMasters()
}
}
retryTimer // start timer
}
}

def changeMaster(url: String) {
Expand Down Expand Up @@ -179,6 +179,11 @@ private[spark] class AppClient(
alreadyDead = true
}
}

override def postStop() {
retryTimer.foreach(_.cancel())
}

}

def start() {
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ private[spark] class Worker(
val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
val workerSource = new WorkerSource(this)

var retryTimer: Option[Cancellable] = None

def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed

Expand Down Expand Up @@ -163,21 +165,20 @@ private[spark] class Worker(

def registerWithMaster() {
tryRegisterAllMasters()

var retries = 0
lazy val retryTimer: Cancellable =
retryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
retries += 1
if (registered) {
retryTimer.cancel()
retryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
logError("All masters are unresponsive! Giving up.")
System.exit(1)
} else {
tryRegisterAllMasters()
}
}
retryTimer // start timer
}
}

override def receive = {
Expand Down Expand Up @@ -346,6 +347,7 @@ private[spark] class Worker(
}

override def postStop() {
retryTimer.foreach(_.cancel())
executors.values.foreach(_.kill())
drivers.values.foreach(_.kill())
webUi.stop()
Expand Down

0 comments on commit 69c348c

Please sign in to comment.