Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Fixes #2509 by listening for connection drops and abdicating directly
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias committed Nov 25, 2015
1 parent fcc6199 commit 1af1487
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 12 deletions.
43 changes: 40 additions & 3 deletions src/main/scala/mesosphere/marathon/MarathonModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import mesosphere.chaos.http.HttpConf
import mesosphere.marathon.Protos.MarathonTask
import mesosphere.marathon.api.LeaderInfo
import mesosphere.marathon.core.launchqueue.LaunchQueue
import mesosphere.marathon.core.leadership.LeadershipCoordinator
import mesosphere.marathon.event.http._
import mesosphere.marathon.event.{ EventModule, HistoryActor }
import mesosphere.marathon.health.{ HealthCheckManager, MarathonHealthCheckManager }
Expand All @@ -34,15 +35,17 @@ import mesosphere.util.state.mesos.MesosStateStore
import mesosphere.util.state.zk.{ CompressionConf, ZKStore }
import mesosphere.util.state.{ FrameworkId, FrameworkIdUtil, PersistentStore, _ }
import org.apache.mesos.state.ZooKeeperState
import org.apache.zookeeper.ZooDefs
import org.apache.zookeeper.ZooDefs.Ids
import org.apache.zookeeper.{ WatchedEvent, Watcher, ZooDefs }
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters._
import scala.collection.immutable.Seq
import scala.concurrent.Await
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import scala.collection.immutable.Seq

//scalastyle:off number.of.methods parameter.number

object ModuleNames {
final val CANDIDATE = "CANDIDATE"
Expand Down Expand Up @@ -83,7 +86,6 @@ class MarathonModule(conf: MarathonConf, http: HttpConf, zk: ZooKeeperClient)
bind(classOf[SchedulerDriverFactory]).to(classOf[MesosSchedulerDriverFactory]).in(Scopes.SINGLETON)
bind(classOf[MarathonLeaderInfoMetrics]).in(Scopes.SINGLETON)
bind(classOf[MarathonScheduler]).in(Scopes.SINGLETON)
bind(classOf[MarathonSchedulerService]).in(Scopes.SINGLETON)
bind(classOf[LeaderInfo]).to(classOf[MarathonLeaderInfo]).in(Scopes.SINGLETON)
bind(classOf[TaskFactory]).to(classOf[DefaultTaskFactory]).in(Scopes.SINGLETON)

Expand All @@ -103,6 +105,41 @@ class MarathonModule(conf: MarathonConf, http: HttpConf, zk: ZooKeeperClient)
.toInstance(leader)
}

@Provides
@Singleton
def provideMarathonSchedulerService(
zk: ZooKeeperClient,
leadershipCoordinator: LeadershipCoordinator,
healthCheckManager: HealthCheckManager,
@Named(ModuleNames.CANDIDATE) candidate: Option[Candidate],
config: MarathonConf,
frameworkIdUtil: FrameworkIdUtil,
@Named(ModuleNames.LEADER_ATOMIC_BOOLEAN) leader: AtomicBoolean,
appRepository: AppRepository,
taskTracker: TaskTracker,
driverFactory: SchedulerDriverFactory,
system: ActorSystem,
migration: Migration,
@Named("schedulerActor") schedulerActor: ActorRef,
@Named(EventModule.busName) eventStream: EventStream,
leadershipCallbacks: Seq[LeadershipCallback] = Seq.empty): MarathonSchedulerService = {

val service = new MarathonSchedulerService(leadershipCoordinator, healthCheckManager, candidate, config,
frameworkIdUtil, leader, appRepository, taskTracker, driverFactory, system, migration, schedulerActor,
eventStream, leadershipCallbacks)

// The current candidate implementation does not handle ZK connection loss
// We register a listener on the candidate zk connection and handle disconnetions explicitly
zk.get().register(new Watcher {
override def process(event: WatchedEvent): Unit = event.getState match {
case Watcher.Event.KeeperState.Disconnected if leader.get() => service.abdicateLeadership()
case _ =>
}
})

service
}

@Provides
@Singleton
def provideMesosLeaderInfo(): MesosLeaderInfo = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package mesosphere.marathon.integration

import mesosphere.marathon.integration.setup.{ IntegrationFunSuite, MarathonClusterIntegrationTest, WaitTestSupport }
import mesosphere.marathon.integration.setup.{ ProcessKeeper, IntegrationFunSuite, MarathonClusterIntegrationTest, WaitTestSupport }
import org.scalatest.{ GivenWhenThen, Matchers }

import scala.concurrent.duration._
Expand Down Expand Up @@ -50,4 +50,21 @@ class LeaderIntegrationTest extends IntegrationFunSuite
And("the leader must have changed")
WaitTestSupport.waitUntil("the leader changes", 30.seconds) { marathon.leader().value != leader }
}

test("abdicate if the zk connection is dropped") {
Given("a leader")
WaitTestSupport.waitUntil("a leader has been elected", 30.seconds) { marathon.leader().code == 200 }

When("ZooKeeper dies")
ProcessKeeper.stopProcess("zookeeper")

Then("There is no leader in the cluster")
WaitTestSupport.waitUntil("a leader has been abdicated", 30.seconds) { marathon.leader().code == 404 }

When("Zookeeper starts again")
ProcessKeeper.startZooKeeper(config.zkPort, "/tmp/foo/single", wipeWorkDir = false)

Then("A new leader is elected")
WaitTestSupport.waitUntil("a leader has been elected", 30.seconds) { marathon.leader().code == 200 }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.util.{ Failure, Success, Try }
object ProcessKeeper {

private[this] val log = LoggerFactory.getLogger(getClass.getName)
private[this] var processes = List.empty[Process]
private[this] var processes = Map.empty[String, Process]
private[this] var services = List.empty[Service]

private[this] val ENV_MESOS_WORK_DIR: String = "MESOS_WORK_DIR"
Expand All @@ -42,11 +42,13 @@ object ProcessKeeper {
}
}

def startZooKeeper(port: Int, workDir: String) {
def startZooKeeper(port: Int, workDir: String, wipeWorkDir: Boolean = true) {
val args = "-Dzookeeper.jmx.log4j.disable=true" :: "org.apache.zookeeper.server.ZooKeeperServerMain" :: port.toString :: workDir :: Nil
val workDirFile = new File(workDir)
FileUtils.deleteDirectory(workDirFile)
FileUtils.forceMkdir(workDirFile)
if (wipeWorkDir) {
FileUtils.deleteDirectory(workDirFile)
FileUtils.forceMkdir(workDirFile)
}
startJavaProcess("zookeeper", heapInMegs = 256, args, new File("."), sys.env, _.contains("binding to port"))
}

Expand Down Expand Up @@ -129,7 +131,7 @@ object ProcessKeeper {
val upOrExited = Future.firstCompletedOf(Seq(up.future, processExitCode))(ExecutionContext.global)
Try(Await.result(upOrExited, timeout)) match {
case Success(result) =>
processes = process :: processes
processes += name -> process
result match {
case ProcessExited =>
throw new IllegalStateException(s"Process $name exited before coming up. Give up. $processBuilder")
Expand Down Expand Up @@ -169,16 +171,25 @@ object ProcessKeeper {
}
}

def stopProcess(name: String): Option[Int] = {
log.info(s"Stop Process $name")
processes.get(name).map { process =>
Try(process.destroy())
processes -= name
process.exitValue()
}
}

def stopAllProcesses(): Unit = {

def waitForProcessesToFinish(): Unit = {
processes.foreach(p => Try(p.destroy()))
processes.values.foreach(p => Try(p.destroy()))

// Unfortunately, there seem to be race conditions in Process.exitValue.
// Thus this ugly workaround.
val waitForExitInThread = new Thread() {
override def run(): Unit = {
processes.foreach(_.exitValue())
processes.values.foreach(_.exitValue())
}
}
waitForExitInThread.start()
Expand All @@ -200,7 +211,7 @@ object ProcessKeeper {
log.error("giving up waiting for processes to finish", e)
}
}
processes = Nil
processes = Map.empty
}

def startService(service: Service): Unit = {
Expand Down

0 comments on commit 1af1487

Please sign in to comment.