From cc0f256c2d6866cee7c9e92422b55d22b8941ca8 Mon Sep 17 00:00:00 2001 From: Hemant Bhanawat Date: Thu, 25 Aug 2016 17:06:07 +0530 Subject: [PATCH 1/3] Dont restart the executor if the cache is closing down. --- .../executor/SnappyCoarseGrainedExecutorBackend.scala | 7 ++++++- spark | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/cluster/src/main/scala/org/apache/spark/executor/SnappyCoarseGrainedExecutorBackend.scala b/cluster/src/main/scala/org/apache/spark/executor/SnappyCoarseGrainedExecutorBackend.scala index af3d9a36bd..4e34dd8c8a 100644 --- a/cluster/src/main/scala/org/apache/spark/executor/SnappyCoarseGrainedExecutorBackend.scala +++ b/cluster/src/main/scala/org/apache/spark/executor/SnappyCoarseGrainedExecutorBackend.scala @@ -18,6 +18,7 @@ package org.apache.spark.executor import java.net.URL +import com.pivotal.gemfirexd.internal.engine.store.GemFireStore import io.snappydata.cluster.ExecutorInitiator import org.apache.spark.SparkEnv @@ -53,9 +54,12 @@ class SnappyCoarseGrainedExecutorBackend( * but those functions will have to be brought in sync with CoarseGrainedExecutorBackend * after every merge. */ - override protected def exitExecutor(code: Int, + override def exitExecutor(code: Int, reason: String, throwable: Throwable): Unit = { exitWithoutRestart() + // See if the VM is going down + GemFireStore.getBootingInstance.getGemFireCache.getCancelCriterion. + checkCancelInProgress(null) // Executor may fail to connect to the driver because of // https://issues.apache.org/jira/browse/SPARK-9820 and // https://issues.apache.org/jira/browse/SPARK-8592. To overcome such @@ -67,6 +71,7 @@ class SnappyCoarseGrainedExecutorBackend( logError(reasonStr, throwable) } ExecutorInitiator.restartExecutor() + } def exitWithoutRestart(): Unit = { diff --git a/spark b/spark index f2d482fe7c..426428d3d0 160000 --- a/spark +++ b/spark @@ -1 +1 @@ -Subproject commit f2d482fe7cc9cd21fdd3bd6a1b8107668f0d80d5 +Subproject commit 426428d3d0ceaa6117e3e84565dd1f719895c6dd From a86fcbb1e54d4e18c9779faded39d67e50cefd85 Mon Sep 17 00:00:00 2001 From: Hemant Bhanawat Date: Thu, 25 Aug 2016 20:36:21 +0530 Subject: [PATCH 2/3] Added a test for testing failure and then restarting of executors. --- .../snappydata/cluster/ClusterMgrDUnitTest.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/cluster/src/dunit/scala/io/snappydata/cluster/ClusterMgrDUnitTest.scala b/cluster/src/dunit/scala/io/snappydata/cluster/ClusterMgrDUnitTest.scala index 249066e0ac..1ab3456f2f 100644 --- a/cluster/src/dunit/scala/io/snappydata/cluster/ClusterMgrDUnitTest.scala +++ b/cluster/src/dunit/scala/io/snappydata/cluster/ClusterMgrDUnitTest.scala @@ -52,6 +52,16 @@ class ClusterMgrDUnitTest(s: String) extends ClusterManagerTestBase(s) { ClusterManagerTestBase.startSnappyLead(ClusterManagerTestBase.locatorPort, bootProps) } + def testExecutorFailure(): Unit = { + try { + failTheExecutors + } catch { + case _ : Throwable => + } + // The executors should have started automatically, so this should not hang + startSparkJob() + } + def testSnap684(): Unit = { startSparkJob() startGemJob() @@ -77,6 +87,12 @@ object ClusterMgrDUnitTest { assert(3.25 > pi) } + def failTheExecutors: Unit = { + sc.parallelize(1 until 100, 5).map { i => + throw new InternalError() + }.collect() + } + def startGemJob(): Unit = { val snContext = SnappyContext(sc) From cb7f1d7f623ba84f16e8763b55438de466eab5e1 Mon Sep 17 00:00:00 2001 From: Hemant Bhanawat Date: Mon, 29 Aug 2016 10:49:40 +0530 Subject: [PATCH 3/3] SnappyUncaughtExceptionHandler is now set as the uncaught exception handler in the executor. SnappyUncaughtExceptionHandler exits the executor component unlike the SparkUncaughtExceptionHandler which does a System.exit. Added a new test to test the uncaughtexception handler. --- .../cluster/ClusterMgrDUnitTest.scala | 18 +++++++- .../SnappyCoarseGrainedExecutorBackend.scala | 4 +- .../spark/executor/SnappyExecutor.scala | 46 ++++++++++++++++++- spark | 2 +- 4 files changed, 65 insertions(+), 5 deletions(-) diff --git a/cluster/src/dunit/scala/io/snappydata/cluster/ClusterMgrDUnitTest.scala b/cluster/src/dunit/scala/io/snappydata/cluster/ClusterMgrDUnitTest.scala index 1ab3456f2f..75c1b0669c 100644 --- a/cluster/src/dunit/scala/io/snappydata/cluster/ClusterMgrDUnitTest.scala +++ b/cluster/src/dunit/scala/io/snappydata/cluster/ClusterMgrDUnitTest.scala @@ -52,7 +52,7 @@ class ClusterMgrDUnitTest(s: String) extends ClusterManagerTestBase(s) { ClusterManagerTestBase.startSnappyLead(ClusterManagerTestBase.locatorPort, bootProps) } - def testExecutorFailure(): Unit = { + def testUncaughtExceptionInExecutor(): Unit = { try { failTheExecutors } catch { @@ -62,6 +62,14 @@ class ClusterMgrDUnitTest(s: String) extends ClusterManagerTestBase(s) { startSparkJob() } + def testUncaughtExceptionInExecutorthread(): Unit = { + vm2.invoke(getClass, "failAThread") + vm1.invoke(getClass, "failAThread") + vm0.invoke(getClass, "failAThread") + // The executors should have started automatically, so this should not hang + startSparkJob() + } + def testSnap684(): Unit = { startSparkJob() startGemJob() @@ -93,6 +101,14 @@ object ClusterMgrDUnitTest { }.collect() } + def failAThread: Unit = { + new Thread(){ + override def run(): Unit = { + throw new InternalError(); + } + }.start() + } + def startGemJob(): Unit = { val snContext = SnappyContext(sc) diff --git a/cluster/src/main/scala/org/apache/spark/executor/SnappyCoarseGrainedExecutorBackend.scala b/cluster/src/main/scala/org/apache/spark/executor/SnappyCoarseGrainedExecutorBackend.scala index 4e34dd8c8a..0f2dc3b5cc 100644 --- a/cluster/src/main/scala/org/apache/spark/executor/SnappyCoarseGrainedExecutorBackend.scala +++ b/cluster/src/main/scala/org/apache/spark/executor/SnappyCoarseGrainedExecutorBackend.scala @@ -46,7 +46,9 @@ class SnappyCoarseGrainedExecutorBackend( } override protected def registerExecutor: Executor = - new SnappyExecutor(executorId, hostName, env, userClassPath, isLocal = false) + new SnappyExecutor(executorId, hostName, env, + userClassPath, new SnappyUncaughtExceptionHandler(this), + isLocal = false) /** * Snappy addition (Replace System.exit with exitExecutor). We could have diff --git a/cluster/src/main/scala/org/apache/spark/executor/SnappyExecutor.scala b/cluster/src/main/scala/org/apache/spark/executor/SnappyExecutor.scala index 9478f95387..6102d0e00a 100644 --- a/cluster/src/main/scala/org/apache/spark/executor/SnappyExecutor.scala +++ b/cluster/src/main/scala/org/apache/spark/executor/SnappyExecutor.scala @@ -22,16 +22,26 @@ import java.net.URL import com.pivotal.gemfirexd.internal.engine.Misc import org.apache.spark.SparkEnv -import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} +import org.apache.spark.internal.Logging +import org.apache.spark.util.SparkUncaughtExceptionHandler._ +import org.apache.spark.util.{SparkExitCode, ShutdownHookManager, +ChildFirstURLClassLoader, MutableURLClassLoader, Utils} class SnappyExecutor( executorId: String, executorHostname: String, env: SparkEnv, userClassPath: Seq[URL] = Nil, + exceptionHandler: SnappyUncaughtExceptionHandler, isLocal: Boolean = false) extends Executor(executorId, executorHostname, env, userClassPath, isLocal) { + if (!isLocal) { + // Setup an uncaught exception handler for non-local mode. + // Make any thread terminations due to uncaught exceptions + // kill the executor component + Thread.setDefaultUncaughtExceptionHandler(exceptionHandler) + } override def createClassLoader(): MutableURLClassLoader = { // Bootstrap the list of jars with the user class path. val now = System.currentTimeMillis() @@ -68,7 +78,6 @@ class SnappyChildFirstURLClassLoader(urls: Array[URL], parent: ClassLoader) } } - class SnappyMutableURLClassLoader(urls: Array[URL], parent: ClassLoader) extends MutableURLClassLoader(urls, parent) { override def loadClass(name: String, resolve: Boolean): Class[_] = { @@ -79,4 +88,37 @@ class SnappyMutableURLClassLoader(urls: Array[URL], parent: ClassLoader) Misc.getMemStore.getDatabase.getClassFactory.loadClassFromDB(name) } } +} + + +/** + * The default uncaught exception handler for Executors + */ +private class SnappyUncaughtExceptionHandler( + val executorBackend: SnappyCoarseGrainedExecutorBackend) + extends Thread.UncaughtExceptionHandler with Logging { + + override def uncaughtException(thread: Thread, exception: Throwable) { + try { + // Make it explicit that uncaught exceptions are thrown when container is shutting down. + // It will help users when they analyze the executor logs + val inShutdownMsg = if (ShutdownHookManager.inShutdown()) "[Container in shutdown] " else "" + val errMsg = "Uncaught exception in thread " + logError(inShutdownMsg + errMsg + thread, exception) + + // We may have been called from a shutdown hook, there is no need to do anything + if (!ShutdownHookManager.inShutdown()) { + if (exception.isInstanceOf[OutOfMemoryError]) { + executorBackend.exitExecutor(SparkExitCode.OOM, "Out of Memory", exception) + } else { + executorBackend.exitExecutor( + SparkExitCode.UNCAUGHT_EXCEPTION, errMsg, exception) + } + } + } catch { + // Exception while handling an uncaught exception. we cannot do much here + case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM) + case t: Throwable => Runtime.getRuntime.halt(SparkExitCode.UNCAUGHT_EXCEPTION_TWICE) + } + } } \ No newline at end of file diff --git a/spark b/spark index 426428d3d0..0019e39a79 160000 --- a/spark +++ b/spark @@ -1 +1 @@ -Subproject commit 426428d3d0ceaa6117e3e84565dd1f719895c6dd +Subproject commit 0019e39a796c4f235d9d7b44c583190c46d375de