Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensuring that Spark Uncaught exceptions are handled in the Snappy side and do not cause a system.exit #342

Merged
merged 5 commits into from
Sep 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,24 @@ class ClusterMgrDUnitTest(s: String) extends ClusterManagerTestBase(s) {
ClusterManagerTestBase.startSnappyLead(ClusterManagerTestBase.locatorPort, bootProps)
}

def testUncaughtExceptionInExecutor(): Unit = {
try {
failTheExecutors
} catch {
case _ : Throwable =>
}
// The executors should have started automatically, so this should not hang
startSparkJob()
}

def testUncaughtExceptionInExecutorthread(): Unit = {
vm2.invoke(getClass, "failAThread")
vm1.invoke(getClass, "failAThread")
vm0.invoke(getClass, "failAThread")
// The executors should have started automatically, so this should not hang
startSparkJob()
}

def testSnap684(): Unit = {
startSparkJob()
startGemJob()
Expand All @@ -76,6 +94,20 @@ object ClusterMgrDUnitTest {
assert(3.25 > pi)
}

def failTheExecutors: Unit = {
sc.parallelize(1 until 100, 5).map { i =>
throw new InternalError()
}.collect()
}

def failAThread: Unit = {
new Thread(){
override def run(): Unit = {
throw new InternalError();
}
}.start()
}

def startGemJob(): Unit = {

val snContext = SnappyContext(sc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.executor

import java.net.URL

import com.pivotal.gemfirexd.internal.engine.store.GemFireStore
import io.snappydata.cluster.ExecutorInitiator

import org.apache.spark.SparkEnv
Expand Down Expand Up @@ -45,17 +46,22 @@ class SnappyCoarseGrainedExecutorBackend(
}

override protected def registerExecutor: Executor =
new SnappyExecutor(executorId, hostName, env, userClassPath, isLocal = false)
new SnappyExecutor(executorId, hostName, env,
userClassPath, new SnappyUncaughtExceptionHandler(this),
isLocal = false)

/**
* Snappy addition (Replace System.exit with exitExecutor). We could have
* added functions calling System.exit to SnappyCoarseGrainedExecutorBackend
* but those functions will have to be brought in sync with CoarseGrainedExecutorBackend
* after every merge.
*/
override protected def exitExecutor(code: Int,
override def exitExecutor(code: Int,
reason: String, throwable: Throwable): Unit = {
exitWithoutRestart()
// See if the VM is going down
GemFireStore.getBootingInstance.getGemFireCache.getCancelCriterion.
checkCancelInProgress(null)
// Executor may fail to connect to the driver because of
// https://issues.apache.org/jira/browse/SPARK-9820 and
// https://issues.apache.org/jira/browse/SPARK-8592. To overcome such
Expand All @@ -67,6 +73,7 @@ class SnappyCoarseGrainedExecutorBackend(
logError(reasonStr, throwable)
}
ExecutorInitiator.restartExecutor()

}

def exitWithoutRestart(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,26 @@ import java.net.URL
import com.pivotal.gemfirexd.internal.engine.Misc

import org.apache.spark.SparkEnv
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
import org.apache.spark.internal.Logging
import org.apache.spark.util.SparkUncaughtExceptionHandler._
import org.apache.spark.util.{SparkExitCode, ShutdownHookManager,
ChildFirstURLClassLoader, MutableURLClassLoader, Utils}

class SnappyExecutor(
executorId: String,
executorHostname: String,
env: SparkEnv,
userClassPath: Seq[URL] = Nil,
exceptionHandler: SnappyUncaughtExceptionHandler,
isLocal: Boolean = false)
extends Executor(executorId, executorHostname, env, userClassPath, isLocal) {

if (!isLocal) {
// Setup an uncaught exception handler for non-local mode.
// Make any thread terminations due to uncaught exceptions
// kill the executor component
Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
}
override def createClassLoader(): MutableURLClassLoader = {
// Bootstrap the list of jars with the user class path.
val now = System.currentTimeMillis()
Expand Down Expand Up @@ -68,7 +78,6 @@ class SnappyChildFirstURLClassLoader(urls: Array[URL], parent: ClassLoader)
}
}


class SnappyMutableURLClassLoader(urls: Array[URL], parent: ClassLoader)
extends MutableURLClassLoader(urls, parent) {
override def loadClass(name: String, resolve: Boolean): Class[_] = {
Expand All @@ -79,4 +88,37 @@ class SnappyMutableURLClassLoader(urls: Array[URL], parent: ClassLoader)
Misc.getMemStore.getDatabase.getClassFactory.loadClassFromDB(name)
}
}
}


/**
* The default uncaught exception handler for Executors
*/
private class SnappyUncaughtExceptionHandler(
val executorBackend: SnappyCoarseGrainedExecutorBackend)
extends Thread.UncaughtExceptionHandler with Logging {

override def uncaughtException(thread: Thread, exception: Throwable) {
try {
// Make it explicit that uncaught exceptions are thrown when container is shutting down.
// It will help users when they analyze the executor logs
val inShutdownMsg = if (ShutdownHookManager.inShutdown()) "[Container in shutdown] " else ""
val errMsg = "Uncaught exception in thread "
logError(inShutdownMsg + errMsg + thread, exception)

// We may have been called from a shutdown hook, there is no need to do anything
if (!ShutdownHookManager.inShutdown()) {
if (exception.isInstanceOf[OutOfMemoryError]) {
executorBackend.exitExecutor(SparkExitCode.OOM, "Out of Memory", exception)
} else {
executorBackend.exitExecutor(
SparkExitCode.UNCAUGHT_EXCEPTION, errMsg, exception)
}
}
} catch {
// Exception while handling an uncaught exception. we cannot do much here
case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM)
case t: Throwable => Runtime.getRuntime.halt(SparkExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
}
}
2 changes: 1 addition & 1 deletion spark
Submodule spark updated from 0313b6 to 280452