Skip to content

Commit

Permalink
Merge pull request #7 from markhamstra/master-csd
Browse files Browse the repository at this point in the history
SPY-302 backporting of SPARK-1620, SPARK-1685, SPARK-1686, SPARK-1772
  • Loading branch information
jhartlaub committed May 14, 2014
2 parents 49621ec + 76dc266 commit cdea0a1
Show file tree
Hide file tree
Showing 12 changed files with 163 additions and 70 deletions.
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ class SparkContext(
} catch {
// TODO: Enumerate the exact reasons why it can fail
// But irrespective of it, it means we cannot proceed !
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
val backend = new CoarseGrainedSchedulerBackend(scheduler, this.env.actorSystem)
Expand All @@ -233,8 +233,8 @@ class SparkContext(
cons.newInstance(this).asInstanceOf[ClusterScheduler]

} catch {
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}

Expand All @@ -243,8 +243,8 @@ class SparkContext(
val cons = clazz.getConstructor(classOf[ClusterScheduler], classOf[SparkContext])
cons.newInstance(scheduler, this).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ private[spark] object PythonRDD {
}
} catch {
case eof: EOFException => {}
case e => throw e
}
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,11 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
try {
new Socket(daemonHost, daemonPort)
} catch {
case exc: SocketException => {
case exc: SocketException =>
logWarning("Python daemon unexpectedly quit, attempting to restart")
stopDaemon()
startDaemon()
new Socket(daemonHost, daemonPort)
}
case e => throw e
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object SparkHadoopUtil {
try {
Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
} catch {
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
case e: Exception => throw new SparkException("Unable to load YARN support", e)
}
} else {
new SparkHadoopUtil
Expand Down
28 changes: 18 additions & 10 deletions core/src/main/scala/org/apache/spark/deploy/client/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.Logging
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.Utils


/**
Expand Down Expand Up @@ -61,6 +62,7 @@ private[spark] class Client(
var masterAddress: Address = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
var alreadyDead = false // To avoid calling listener.dead() multiple times
var registrationRetryTimer: Option[Cancellable] = None

override def preStart() {
try {
Expand All @@ -85,19 +87,21 @@ private[spark] class Client(
tryRegisterAllMasters()

var retries = 0
lazy val retryTimer: Cancellable =
registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
retries += 1
if (registered) {
retryTimer.cancel()
} else if (retries >= REGISTRATION_RETRIES) {
logError("All masters are unresponsive! Giving up.")
markDead()
} else {
tryRegisterAllMasters()
Utils.tryOrExit {
retries += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
logError("All masters are unresponsive! Giving up.")
markDead()
} else {
tryRegisterAllMasters()
}
}
}
retryTimer // start timer
}
}

def changeMaster(url: String) {
Expand Down Expand Up @@ -174,6 +178,10 @@ private[spark] class Client(
alreadyDead = true
}
}

override def postStop() {
registrationRetryTimer.foreach(_.cancel())
}
}

def start() {
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act

var leaderElectionAgent: ActorRef = _

private var recoveryCompletionTask: Cancellable = _

// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
Expand Down Expand Up @@ -128,6 +130,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}

override def postStop() {
// prevent the CompleteRecovery message sending to restarted master
if (recoveryCompletionTask != null) {
recoveryCompletionTask.cancel()
}
webUi.stop()
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
Expand All @@ -147,10 +153,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act

if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedWorkers)
context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
CompleteRecovery)
}
}

case CompleteRecovery => completeRecovery()

case RevokedLeadership => {
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
Expand Down Expand Up @@ -350,15 +359,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
def schedule() {
private def schedule() {
if (state != RecoveryState.ALIVE) { return }
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
if (spreadOutApps) {
// Try to spread out each app among all the nodes, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
Expand Down
26 changes: 15 additions & 11 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ private[spark] class Worker(
val metricsSystem = MetricsSystem.createMetricsSystem("worker")
val workerSource = new WorkerSource(this)

var registrationRetryTimer: Option[Cancellable] = None

def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed

Expand Down Expand Up @@ -144,21 +146,22 @@ private[spark] class Worker(

def registerWithMaster() {
tryRegisterAllMasters()

var retries = 0
lazy val retryTimer: Cancellable =
registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
retries += 1
if (registered) {
retryTimer.cancel()
} else if (retries >= REGISTRATION_RETRIES) {
logError("All masters are unresponsive! Giving up.")
System.exit(1)
} else {
tryRegisterAllMasters()
Utils.tryOrExit {
retries += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
logError("All masters are unresponsive! Giving up.")
System.exit(1)
} else {
tryRegisterAllMasters()
}
}
}
retryTimer // start timer
}
}

override def receive = {
Expand Down Expand Up @@ -260,6 +263,7 @@ private[spark] class Worker(
}

override def postStop() {
registrationRetryTimer.foreach(_.cancel())
executors.values.foreach(_.kill())
webUi.stop()
metricsSystem.stop()
Expand Down
38 changes: 11 additions & 27 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,7 @@ private[spark] class Executor(
// Setup an uncaught exception handler for non-local mode.
// Make any thread terminations due to uncaught exceptions kill the entire
// executor process to avoid surprising stalls.
Thread.setDefaultUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler {
override def uncaughtException(thread: Thread, exception: Throwable) {
try {
logError("Uncaught exception in thread " + thread, exception)

// We may have been called from a shutdown hook. If so, we must not call System.exit().
// (If we do, we will deadlock.)
if (!Utils.inShutdown()) {
if (exception.isInstanceOf[OutOfMemoryError]) {
System.exit(ExecutorExitCode.OOM)
} else {
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
}
}
} catch {
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
}
}
)
Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler)
}

val executorSource = new ExecutorSource(this, executorId)
Expand Down Expand Up @@ -258,6 +237,11 @@ private[spark] class Executor(
}

case t: Throwable => {
// Attempt to exit cleanly by informing the driver of our failure.
// If anything goes wrong (or this was a fatal exception), we will delegate to
// the default uncaught exception handler, which will terminate the Executor.
logError("Exception in task ID " + taskId, t)

val serviceTime = (System.currentTimeMillis() - taskStart).toInt
val metrics = attemptedTask.flatMap(t => t.metrics)
for (m <- metrics) {
Expand All @@ -267,11 +251,11 @@ private[spark] class Executor(
val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

// TODO: Should we exit the whole executor here? On the one hand, the failed task may
// have left some weird state around depending on when the exception was thrown, but on
// the other hand, maybe we could detect that when future tasks fail and exit then.
logError("Exception in task ID " + taskId, t)
//System.exit(1)
// Don't forcibly exit unless the exception was inherently fatal, to avoid
// stopping other tasks unnecessarily.
if (Utils.isFatalError(t)) {
ExecutorUncaughtExceptionHandler.uncaughtException(t)
}
}
} finally {
runningTasks.remove(taskId)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

import org.apache.spark.Logging
import org.apache.spark.util.Utils

/**
* The default uncaught exception handler for Executors terminates the whole process, to avoid
* getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better
* to fail fast when things go wrong.
*/
private[spark] object ExecutorUncaughtExceptionHandler
extends Thread.UncaughtExceptionHandler with Logging {

override def uncaughtException(thread: Thread, exception: Throwable) {
try {
logError("Uncaught exception in thread " + thread, exception)

// We may have been called from a shutdown hook. If so, we must not call System.exit().
// (If we do, we will deadlock.)
if (!Utils.inShutdown()) {
if (exception.isInstanceOf[OutOfMemoryError]) {
System.exit(ExecutorExitCode.OOM)
} else {
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
}
}
} catch {
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
}

def uncaughtException(exception: Throwable) {
uncaughtException(Thread.currentThread(), exception)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private[spark] class BlockManager(
BlockManagerWorker.startBlockManagerWorker(this)
if (!BlockManager.getDisableHeartBeatsForTesting) {
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
heartBeat()
Utils.tryOrExit { heartBeat() }
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,14 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
private def addShutdownHook() {
localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir))
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
override def run() {
override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
localDirs.foreach { localDir =>
try {
if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
} catch {
case t: Throwable =>
logError("Exception while deleting local spark dir: " + localDir, t)
case e: Exception =>
logError("Exception while deleting local spark dir: " + localDir, e)
}
}

Expand Down
Loading

0 comments on commit cdea0a1

Please sign in to comment.