Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2712 from mesosphere/mv/fix_2509
Browse files Browse the repository at this point in the history
Fixes #2509 by listening for connection drops and abdicating directly
  • Loading branch information
Peter Kolloch committed Dec 14, 2015
2 parents e3e867f + d7954c1 commit 48b1e4b
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 66 deletions.
3 changes: 2 additions & 1 deletion src/main/scala/mesosphere/marathon/MarathonModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ import org.apache.zookeeper.ZooDefs.Ids
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters._
import scala.collection.immutable.Seq
import scala.concurrent.Await
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import scala.collection.immutable.Seq

object ModuleNames {
final val CANDIDATE = "CANDIDATE"
Expand Down Expand Up @@ -84,6 +84,7 @@ class MarathonModule(conf: MarathonConf, http: HttpConf, zk: ZooKeeperClient)
bind(classOf[MarathonLeaderInfoMetrics]).in(Scopes.SINGLETON)
bind(classOf[MarathonScheduler]).in(Scopes.SINGLETON)
bind(classOf[MarathonSchedulerService]).in(Scopes.SINGLETON)
bind(classOf[LeadershipAbdication]).to(classOf[MarathonSchedulerService])
bind(classOf[LeaderInfo]).to(classOf[MarathonLeaderInfo]).in(Scopes.SINGLETON)
bind(classOf[TaskFactory]).to(classOf[DefaultTaskFactory]).in(Scopes.SINGLETON)

Expand Down
47 changes: 28 additions & 19 deletions src/main/scala/mesosphere/marathon/MarathonSchedulerService.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package mesosphere.marathon

import java.util.concurrent.CountDownLatch

import java.util.concurrent.atomic.AtomicBoolean
import java.util.{ Timer, TimerTask }
import javax.inject.{ Inject, Named }
Expand Down Expand Up @@ -52,24 +51,32 @@ trait LeadershipCallback {
def onDefeated: Future[Unit]
}

/**
* Minimal trait to abdicate leadership from external components (e.g. zk connection listener)
*/
trait LeadershipAbdication {
def abdicateLeadership(): Unit
}

/**
* Wrapper class for the scheduler
*/
class MarathonSchedulerService @Inject() (
leadershipCoordinator: LeadershipCoordinator,
healthCheckManager: HealthCheckManager,
@Named(ModuleNames.CANDIDATE) candidate: Option[Candidate],
config: MarathonConf,
frameworkIdUtil: FrameworkIdUtil,
@Named(ModuleNames.LEADER_ATOMIC_BOOLEAN) leader: AtomicBoolean,
appRepository: AppRepository,
taskTracker: TaskTracker,
driverFactory: SchedulerDriverFactory,
system: ActorSystem,
migration: Migration,
@Named("schedulerActor") schedulerActor: ActorRef,
@Named(EventModule.busName) eventStream: EventStream,
leadershipCallbacks: Seq[LeadershipCallback] = Seq.empty) extends AbstractExecutionThreadService with Leader {
leadershipCoordinator: LeadershipCoordinator,
healthCheckManager: HealthCheckManager,
@Named(ModuleNames.CANDIDATE) candidate: Option[Candidate],
config: MarathonConf,
frameworkIdUtil: FrameworkIdUtil,
@Named(ModuleNames.LEADER_ATOMIC_BOOLEAN) leader: AtomicBoolean,
appRepository: AppRepository,
taskTracker: TaskTracker,
driverFactory: SchedulerDriverFactory,
system: ActorSystem,
migration: Migration,
@Named("schedulerActor") schedulerActor: ActorRef,
@Named(EventModule.busName) eventStream: EventStream,
leadershipCallbacks: Seq[LeadershipCallback] = Seq.empty)
extends AbstractExecutionThreadService with Leader with LeadershipAbdication {

import mesosphere.util.ThreadPoolContext.context

Expand Down Expand Up @@ -329,12 +336,14 @@ class MarathonSchedulerService @Inject() (
}

def abdicateLeadership(): Unit = {
log.info("Abdicating")
if (leader.get()) {
log.info("Abdicating")

leadershipCoordinator.stop()
leadershipCoordinator.stop()

// To abdicate we defeat our leadership
defeatLeadership()
// To abdicate we defeat our leadership
defeatLeadership()
}
}

var offerLeadershipBackOff = 0.5.seconds
Expand Down
7 changes: 5 additions & 2 deletions src/main/scala/mesosphere/marathon/core/CoreModuleImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mesosphere.marathon.core

import akka.actor.ActorSystem
import com.google.inject.Inject
import com.twitter.common.zookeeper.ZooKeeperClient
import mesosphere.marathon.api.LeaderInfo
import mesosphere.marathon.core.auth.AuthModule
import mesosphere.marathon.core.base.{ ActorsModule, Clock, ShutdownHooks }
Expand All @@ -16,7 +17,7 @@ import mesosphere.marathon.core.task.tracker.TaskTrackerModule
import mesosphere.marathon.metrics.Metrics
import mesosphere.marathon.state.AppRepository
import mesosphere.marathon.tasks.{ TaskFactory, TaskTracker }
import mesosphere.marathon.{ MarathonConf, MarathonSchedulerDriverHolder }
import mesosphere.marathon.{ LeadershipAbdication, MarathonConf, MarathonSchedulerDriverHolder }

import scala.util.Random

Expand All @@ -28,6 +29,8 @@ import scala.util.Random
*/
class CoreModuleImpl @Inject() (
// external dependencies still wired by guice
zk: ZooKeeperClient,
leader: LeadershipAbdication,
marathonConf: MarathonConf,
metrics: Metrics,
actorSystem: ActorSystem,
Expand All @@ -44,7 +47,7 @@ class CoreModuleImpl @Inject() (
private[this] lazy val shutdownHookModule = ShutdownHooks()
private[this] lazy val actorsModule = new ActorsModule(shutdownHookModule, actorSystem)

override lazy val leadershipModule = new LeadershipModule(actorsModule.actorRefFactory)
override lazy val leadershipModule = new LeadershipModule(actorsModule.actorRefFactory, zk, leader)

// TASKS

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package mesosphere.marathon.core.leadership

import akka.actor.{ ActorRef, ActorRefFactory, Props }
import mesosphere.marathon.core.leadership.impl.{
LeadershipCoordinatorDelegate,
LeadershipCoordinatorActor,
WhenLeaderActor
}
import com.twitter.common.zookeeper.ZooKeeperClient
import mesosphere.marathon.LeadershipAbdication
import mesosphere.marathon.core.leadership.impl._

/**
* This module provides a utility function for starting actors only when our instance is the current leader.
Expand All @@ -14,7 +12,7 @@ import mesosphere.marathon.core.leadership.impl.{
* In addition, it exports the coordinator which coordinates the activity performed when elected or stopped.
* The leadership election logic needs to call the appropriate methods for this module to work.
*/
class LeadershipModule(actorRefFactory: ActorRefFactory) {
class LeadershipModule(actorRefFactory: ActorRefFactory, zk: ZooKeeperClient, leader: LeadershipAbdication) {

private[this] var whenLeaderRefs = Set.empty[ActorRef]
private[this] var started: Boolean = false
Expand Down Expand Up @@ -50,4 +48,9 @@ class LeadershipModule(actorRefFactory: ActorRefFactory) {
val actorRef = actorRefFactory.actorOf(props, "leaderShipCoordinator")
new LeadershipCoordinatorDelegate(actorRef)
}

/**
* Register this actor by default.
*/
startWhenLeader(AbdicateOnConnectionLossActor.props(zk, leader), "AbdicateOnConnectionLoss")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package mesosphere.marathon.core.leadership.impl

import akka.actor.{ Actor, ActorLogging, Props }
import com.twitter.common.zookeeper.ZooKeeperClient
import mesosphere.marathon.LeadershipAbdication
import org.apache.zookeeper.{ ZooKeeper, WatchedEvent, Watcher }
import AbdicateOnConnectionLossActor._

private[leadership] object AbdicateOnConnectionLossActor {
def props(zk: ZooKeeperClient, leader: LeadershipAbdication): Props = {
Props(new AbdicateOnConnectionLossActor(zk, leader))
}

private val connectionDropped = Set(
Watcher.Event.KeeperState.Disconnected,
Watcher.Event.KeeperState.Expired
)
}

/**
* Register as ZK Listener and abdicates leadership on connection loss.
*/
private[impl] class AbdicateOnConnectionLossActor(zk: ZooKeeperClient,
leader: LeadershipAbdication) extends Actor with ActorLogging {

private[impl] val watcher = new Watcher {
val reference = self
override def process(event: WatchedEvent): Unit = reference ! event
}

override def preStart(): Unit = {
log.info("Register as ZK Listener")
zk.register(watcher)
//make sure, we are connected so we can act on subsequent events
if (zk.get().getState != ZooKeeper.States.CONNECTED) leader.abdicateLeadership()
}
override def postStop(): Unit = {
log.info("Unregister as ZK Listener")
zk.unregister(watcher)
}

def disconnected(): Unit = {
log.warning("ZooKeeper connection has been dropped. Abdicate Leadership.")
leader.abdicateLeadership()
}

override def receive: Receive = {
case event: WatchedEvent if connectionDropped.contains(event.getState) => disconnected()
case event: WatchedEvent => log.info(s"Received ZooKeeper Status event: $event")
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
package mesosphere.marathon.core.leadership

import akka.actor.{ ActorRefFactory, ActorRef, Props }
import com.twitter.common.zookeeper.ZooKeeperClient
import mesosphere.marathon.LeadershipAbdication
import mesosphere.marathon.core.base.{ ActorsModule, ShutdownHooks }
import mesosphere.marathon.test.Mockito

/**
* Provides an implementation of the [[LeadershipModule]] which assumes that we are always the leader.
*
* This simplifies tests.
*/
object AlwaysElectedLeadershipModule {
object AlwaysElectedLeadershipModule extends Mockito {
def apply(shutdownHooks: ShutdownHooks): LeadershipModule = {
val actorsModule = new ActorsModule(shutdownHooks)
new AlwaysElectedLeadershipModule(actorsModule.actorRefFactory)
val zk = mock[ZooKeeperClient]
val leader = mock[LeadershipAbdication]
new AlwaysElectedLeadershipModule(actorsModule.actorRefFactory, zk, leader)
}
}

private class AlwaysElectedLeadershipModule(actorRefFactory: ActorRefFactory) extends LeadershipModule(actorRefFactory) {
private class AlwaysElectedLeadershipModule(actorRefFactory: ActorRefFactory, zk: ZooKeeperClient, leader: LeadershipAbdication)
extends LeadershipModule(actorRefFactory, zk, leader) {
override def startWhenLeader(props: Props, name: String, preparedOnStart: Boolean = true): ActorRef =
actorRefFactory.actorOf(props, name)
override def coordinator(): LeadershipCoordinator = ???
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package mesosphere.marathon.core.leadership.impl

import akka.actor.ActorSystem
import akka.testkit.{ TestActorRef, TestKit }
import com.twitter.common.zookeeper.ZooKeeperClient
import mesosphere.marathon.test.Mockito
import mesosphere.marathon.{ LeadershipAbdication, MarathonSpec }
import org.apache.zookeeper.{ ZooKeeper, WatchedEvent, Watcher }
import org.scalatest.GivenWhenThen

class AbdicateOnConnectionLossActorTest extends TestKit(ActorSystem("test")) with MarathonSpec with Mockito with GivenWhenThen {

test("register as zk listener on start") {
Given("ZK and leader refs")
val leader = mock[LeadershipAbdication]

When("The actor is created")
val actor = TestActorRef(AbdicateOnConnectionLossActor.props(zk, leader))

Then("register is called")
verify(zk).register(any)
}

test("zk disconnect events lead to abdication") {
Given("A started AbdicateOnConnectionLossActor")
val leader = mock[LeadershipAbdication]
val actor = TestActorRef[AbdicateOnConnectionLossActor](AbdicateOnConnectionLossActor.props(zk, leader))

When("The actor is killed")
val disconnected = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, "")
actor.underlyingActor.watcher.process(disconnected)

Then("Abdication is called")
verify(leader).abdicateLeadership()
}

test("other zk events do not lead to abdication") {
Given("A started AbdicateOnConnectionLossActor")
val leader = mock[LeadershipAbdication]
val actor = TestActorRef[AbdicateOnConnectionLossActor](AbdicateOnConnectionLossActor.props(zk, leader))

When("An event is fired, that is not a disconnected event")
val authFailed = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, "")
actor.underlyingActor.watcher.process(authFailed)

Then("Abdication is _NOT_ called")
verify(leader, never).abdicateLeadership()
}

var zk: ZooKeeperClient = _

before {
val zookeeper = mock[ZooKeeper]
zookeeper.getState returns ZooKeeper.States.CONNECTED
zk = mock[ZooKeeperClient]
zk.get() returns zookeeper
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package mesosphere.marathon.integration

import mesosphere.marathon.integration.setup.{ IntegrationFunSuite, MarathonClusterIntegrationTest, WaitTestSupport }
import mesosphere.marathon.integration.setup.{ ProcessKeeper, IntegrationFunSuite, MarathonClusterIntegrationTest, WaitTestSupport }
import org.scalatest.{ GivenWhenThen, Matchers }

import scala.concurrent.duration._
Expand Down Expand Up @@ -50,4 +50,24 @@ class LeaderIntegrationTest extends IntegrationFunSuite
And("the leader must have changed")
WaitTestSupport.waitUntil("the leader changes", 30.seconds) { marathon.leader().value != leader }
}

test("abdicate if the zk connection is dropped") {
Given("a leader")
WaitTestSupport.waitUntil("a leader has been elected", 30.seconds) { marathon.leader().code == 200 }

When("ZooKeeper dies")
ProcessKeeper.stopProcess("zookeeper")

Then("There is no leader in the cluster")
WaitTestSupport.waitUntil("a leader has been abdicated", 30.seconds) {
//TODO: should it be always 404?
Seq(404, 503).contains(marathon.leader().code)
}

When("Zookeeper starts again")
startZooKeeperProcess(wipeWorkDir = false)

Then("A new leader is elected")
WaitTestSupport.waitUntil("a leader has been elected", 30.seconds) { marathon.leader().code == 200 }
}
}
Loading

0 comments on commit 48b1e4b

Please sign in to comment.