From a688f8a235f47161e63eb4d86996adea5c8b23c3 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Wed, 25 Nov 2015 10:17:35 +0100 Subject: [PATCH] Fixes #2509 by listening for connection drops and abdicating directly --- .../mesosphere/marathon/MarathonModule.scala | 43 +++++++++++++++++-- .../integration/LeaderIntegrationTest.scala | 22 +++++++++- .../integration/setup/ProcessKeeper.scala | 27 ++++++++---- 3 files changed, 80 insertions(+), 12 deletions(-) diff --git a/src/main/scala/mesosphere/marathon/MarathonModule.scala b/src/main/scala/mesosphere/marathon/MarathonModule.scala index 968a75b4430..75595abf7d2 100644 --- a/src/main/scala/mesosphere/marathon/MarathonModule.scala +++ b/src/main/scala/mesosphere/marathon/MarathonModule.scala @@ -20,6 +20,7 @@ import mesosphere.chaos.http.HttpConf import mesosphere.marathon.Protos.MarathonTask import mesosphere.marathon.api.LeaderInfo import mesosphere.marathon.core.launchqueue.LaunchQueue +import mesosphere.marathon.core.leadership.LeadershipCoordinator import mesosphere.marathon.event.http._ import mesosphere.marathon.event.{ EventModule, HistoryActor } import mesosphere.marathon.health.{ HealthCheckManager, MarathonHealthCheckManager } @@ -34,15 +35,17 @@ import mesosphere.util.state.mesos.MesosStateStore import mesosphere.util.state.zk.{ CompressionConf, ZKStore } import mesosphere.util.state.{ FrameworkId, FrameworkIdUtil, PersistentStore, _ } import org.apache.mesos.state.ZooKeeperState -import org.apache.zookeeper.ZooDefs import org.apache.zookeeper.ZooDefs.Ids +import org.apache.zookeeper.{ WatchedEvent, Watcher, ZooDefs } import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ +import scala.collection.immutable.Seq import scala.concurrent.Await import scala.reflect.ClassTag import scala.util.control.NonFatal -import scala.collection.immutable.Seq + +//scalastyle:off number.of.methods parameter.number object ModuleNames { final val CANDIDATE = "CANDIDATE" @@ -83,7 +86,6 @@ class MarathonModule(conf: MarathonConf, http: HttpConf, zk: ZooKeeperClient) bind(classOf[SchedulerDriverFactory]).to(classOf[MesosSchedulerDriverFactory]).in(Scopes.SINGLETON) bind(classOf[MarathonLeaderInfoMetrics]).in(Scopes.SINGLETON) bind(classOf[MarathonScheduler]).in(Scopes.SINGLETON) - bind(classOf[MarathonSchedulerService]).in(Scopes.SINGLETON) bind(classOf[LeaderInfo]).to(classOf[MarathonLeaderInfo]).in(Scopes.SINGLETON) bind(classOf[TaskFactory]).to(classOf[DefaultTaskFactory]).in(Scopes.SINGLETON) @@ -103,6 +105,41 @@ class MarathonModule(conf: MarathonConf, http: HttpConf, zk: ZooKeeperClient) .toInstance(leader) } + @Provides + @Singleton + def provideMarathonSchedulerService( + zk: ZooKeeperClient, + leadershipCoordinator: LeadershipCoordinator, + healthCheckManager: HealthCheckManager, + @Named(ModuleNames.CANDIDATE) candidate: Option[Candidate], + config: MarathonConf, + frameworkIdUtil: FrameworkIdUtil, + @Named(ModuleNames.LEADER_ATOMIC_BOOLEAN) leader: AtomicBoolean, + appRepository: AppRepository, + taskTracker: TaskTracker, + driverFactory: SchedulerDriverFactory, + system: ActorSystem, + migration: Migration, + @Named("schedulerActor") schedulerActor: ActorRef, + @Named(EventModule.busName) eventStream: EventStream, + leadershipCallbacks: Seq[LeadershipCallback] = Seq.empty): MarathonSchedulerService = { + + val service = new MarathonSchedulerService(leadershipCoordinator, healthCheckManager, candidate, config, + frameworkIdUtil, leader, appRepository, taskTracker, driverFactory, system, migration, schedulerActor, + eventStream, leadershipCallbacks) + + // The current candidate implementation does not handle ZK connection loss + // We register a listener on the candidate zk connection and handle disconnetions explicitly + zk.get().register(new Watcher { + override def process(event: WatchedEvent): Unit = event.getState match { + case Watcher.Event.KeeperState.Disconnected if leader.get() => service.abdicateLeadership() + case _ => + } + }) + + service + } + @Provides @Singleton def provideMesosLeaderInfo(): MesosLeaderInfo = { diff --git a/src/test/scala/mesosphere/marathon/integration/LeaderIntegrationTest.scala b/src/test/scala/mesosphere/marathon/integration/LeaderIntegrationTest.scala index 85d50487125..44315030eee 100644 --- a/src/test/scala/mesosphere/marathon/integration/LeaderIntegrationTest.scala +++ b/src/test/scala/mesosphere/marathon/integration/LeaderIntegrationTest.scala @@ -1,6 +1,6 @@ package mesosphere.marathon.integration -import mesosphere.marathon.integration.setup.{ IntegrationFunSuite, MarathonClusterIntegrationTest, WaitTestSupport } +import mesosphere.marathon.integration.setup.{ ProcessKeeper, IntegrationFunSuite, MarathonClusterIntegrationTest, WaitTestSupport } import org.scalatest.{ GivenWhenThen, Matchers } import scala.concurrent.duration._ @@ -50,4 +50,24 @@ class LeaderIntegrationTest extends IntegrationFunSuite And("the leader must have changed") WaitTestSupport.waitUntil("the leader changes", 30.seconds) { marathon.leader().value != leader } } + + test("abdicate if the zk connection is dropped") { + Given("a leader") + WaitTestSupport.waitUntil("a leader has been elected", 30.seconds) { marathon.leader().code == 200 } + + When("ZooKeeper dies") + ProcessKeeper.stopProcess("zookeeper") + + Then("There is no leader in the cluster") + WaitTestSupport.waitUntil("a leader has been abdicated", 30.seconds) { + //TODO: should it be always 404? + Seq(404, 503).contains(marathon.leader().code) + } + + When("Zookeeper starts again") + ProcessKeeper.startZooKeeper(config.zkPort, "/tmp/foo/single", wipeWorkDir = false) + + Then("A new leader is elected") + WaitTestSupport.waitUntil("a leader has been elected", 30.seconds) { marathon.leader().code == 200 } + } } diff --git a/src/test/scala/mesosphere/marathon/integration/setup/ProcessKeeper.scala b/src/test/scala/mesosphere/marathon/integration/setup/ProcessKeeper.scala index a90810d09f8..d63f7dfd040 100644 --- a/src/test/scala/mesosphere/marathon/integration/setup/ProcessKeeper.scala +++ b/src/test/scala/mesosphere/marathon/integration/setup/ProcessKeeper.scala @@ -27,7 +27,7 @@ import scala.util.{ Failure, Success, Try } object ProcessKeeper { private[this] val log = LoggerFactory.getLogger(getClass.getName) - private[this] var processes = List.empty[Process] + private[this] var processes = Map.empty[String, Process] private[this] var services = List.empty[Service] private[this] val ENV_MESOS_WORK_DIR: String = "MESOS_WORK_DIR" @@ -42,11 +42,13 @@ object ProcessKeeper { } } - def startZooKeeper(port: Int, workDir: String) { + def startZooKeeper(port: Int, workDir: String, wipeWorkDir: Boolean = true) { val args = "-Dzookeeper.jmx.log4j.disable=true" :: "org.apache.zookeeper.server.ZooKeeperServerMain" :: port.toString :: workDir :: Nil val workDirFile = new File(workDir) - FileUtils.deleteDirectory(workDirFile) - FileUtils.forceMkdir(workDirFile) + if (wipeWorkDir) { + FileUtils.deleteDirectory(workDirFile) + FileUtils.forceMkdir(workDirFile) + } startJavaProcess("zookeeper", heapInMegs = 256, args, new File("."), sys.env, _.contains("binding to port")) } @@ -129,7 +131,7 @@ object ProcessKeeper { val upOrExited = Future.firstCompletedOf(Seq(up.future, processExitCode))(ExecutionContext.global) Try(Await.result(upOrExited, timeout)) match { case Success(result) => - processes = process :: processes + processes += name -> process result match { case ProcessExited => throw new IllegalStateException(s"Process $name exited before coming up. Give up. $processBuilder") @@ -169,16 +171,25 @@ object ProcessKeeper { } } + def stopProcess(name: String): Option[Int] = { + log.info(s"Stop Process $name") + processes.get(name).map { process => + Try(process.destroy()) + processes -= name + process.exitValue() + } + } + def stopAllProcesses(): Unit = { def waitForProcessesToFinish(): Unit = { - processes.foreach(p => Try(p.destroy())) + processes.values.foreach(p => Try(p.destroy())) // Unfortunately, there seem to be race conditions in Process.exitValue. // Thus this ugly workaround. val waitForExitInThread = new Thread() { override def run(): Unit = { - processes.foreach(_.exitValue()) + processes.values.foreach(_.exitValue()) } } waitForExitInThread.start() @@ -200,7 +211,7 @@ object ProcessKeeper { log.error("giving up waiting for processes to finish", e) } } - processes = Nil + processes = Map.empty } def startService(service: Service): Unit = {