Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49479][CORE] Use daemon ScheduledThreadPoolExecutor for BarrierCoordinator #47956

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

jshmchenxi
Copy link
Contributor

@jshmchenxi jshmchenxi commented Sep 2, 2024

What changes were proposed in this pull request?

Use daemon ScheduledThreadPoolExecutor instead of non-daemon one as timer for BarrierCoordinator.

Why are the changes needed?

In Barrier Execution Mode, Spark driver JVM could hang around after calling spark.stop(). Although the Spark Context was shutdown, the JVM was still running. The reason was that there is a non-daemon timer thread named BarrierCoordinator barrier epoch increment timer, which prevented the driver JVM from stopping.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Manual test.

Run barrier_example.py script locally using ./bin/spark-submit barrier_example.py. Without this change, the JVM would hang there and not exit. With this change it would exit successuflly.

Was this patch authored or co-authored using generative AI tooling?

No

@jshmchenxi
Copy link
Contributor Author

This commit depends on #47957 and #47945 for fixing SPARK-49479 in branch 3.5.

@jshmchenxi
Copy link
Contributor Author

@LuciferYang @yaooqinn Please take a look, thanks!

@mridulm
Copy link
Contributor

mridulm commented Sep 4, 2024

BarrierCoordinator.onStop should have terminated it - do we have additional details on why this did not happen ?

Thanks for the reproducible test - let me try to debug with it.

@mridulm
Copy link
Contributor

mridulm commented Sep 4, 2024

So the issue here is the mixing of two different api's - TimerTask and ScheduledExecutorService.
The original code, which @jiangxb1987 added, used Timer and TimerTask consistently.

Hence, when timerTask.cancel was invoked, the corresponding timer was cancelled - and things worked as expected.

In SPARK-46895, Timer was replaced with ScheduledExecutorService in various cases - and that is where the bug unfortunately came in.

TimerTask is also a Runnable, and so can be passed to schedule* methods in ScheduledExecutorService.
But unfortunately TimerTask.cancel is effectively a noop when it is scheduled using ScheduledExecutorService.

That is what causes the issue here.

To test, the following diff ensures that the attached test that @jshmchenxi provided works fine (it is just a POC, not a complete fix).

diff --git a/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala b/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
index adce6c3f5ffd..be8a4bd831cc 100644
--- a/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
@@ -17,8 +17,7 @@
 
 package org.apache.spark
 
-import java.util.TimerTask
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
 import java.util.function.Consumer
 
 import scala.collection.mutable.{ArrayBuffer, HashSet}
@@ -77,12 +76,23 @@ private[spark] class BarrierCoordinator(
 
   override def onStop(): Unit = {
     try {
+      logInfo("onStop called ...")
       states.forEachValue(1, clearStateConsumer)
+      logInfo("states.forEachValue done ...")
       states.clear()
+      logInfo("states.clear done ...")
       listenerBus.removeListener(listener)
+      logInfo("listenerBus.removeListener done ...")
       ThreadUtils.shutdown(timer)
+      logInfo("ThreadUtils.shutdown done ...")
+    } catch {
+      case th: Throwable =>
+        logInfo("Unexpected ...", th)
+        throw th
     } finally {
+      logInfo("super.onStop called ...")
       super.onStop()
+      logInfo("super.onStop called ... DONE")
     }
   }
 
@@ -117,11 +127,12 @@ private[spark] class BarrierCoordinator(
     private val requestMethods = new HashSet[RequestMethod.Value]
 
     // A timer task that ensures we may timeout for a barrier() call.
-    private var timerTask: TimerTask = null
+    private var timerTask: Runnable = null
+    private var timerTaskFuture: ScheduledFuture[_] = null
 
     // Init a TimerTask for a barrier() call.
     private def initTimerTask(state: ContextBarrierState): Unit = {
-      timerTask = new TimerTask {
+      timerTask = new Runnable {
         override def run(): Unit = state.synchronized {
           // Timeout current barrier() call, fail all the sync requests.
           requesters.foreach(_.sendFailure(new SparkException("The coordinator didn't get all " +
@@ -134,10 +145,10 @@ private[spark] class BarrierCoordinator(
 
     // Cancel the current active TimerTask and release resources.
     private def cancelTimerTask(): Unit = {
-      if (timerTask != null) {
-        timerTask.cancel()
+      if (timerTaskFuture != null) {
+        timerTaskFuture.cancel(true)
         timer.purge()
-        timerTask = null
+        timerTaskFuture = null
       }
     }
 
@@ -173,7 +184,7 @@ private[spark] class BarrierCoordinator(
         // we may timeout for the sync.
         if (requesters.isEmpty) {
           initTimerTask(this)
-          timer.schedule(timerTask, timeoutInSecs, TimeUnit.SECONDS)
+          timerTaskFuture = timer.schedule(timerTask, timeoutInSecs, TimeUnit.SECONDS)
         }
         // Add the requester to array of RPCCallContexts pending for reply.
         requesters += requester
diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index c8d6000cd628..e794ae030c38 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark
 
-import java.util.{Properties, TimerTask}
+import java.util.Properties
 import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit}
 
 import scala.concurrent.duration._
@@ -72,7 +72,7 @@ class BarrierTaskContext private[spark] (
     logTrace("Current callSite: " + Utils.getCallSite())
 
     val startTime = System.currentTimeMillis()
-    val timerTask = new TimerTask {
+    val timerTask = new Runnable {
       override def run(): Unit = {
         logProgressInfo(
           log"waiting under the global sync since ${MDC(TIME, startTime)}",
@@ -81,7 +81,7 @@ class BarrierTaskContext private[spark] (
       }
     }
     // Log the update of global sync every 1 minute.
-    timer.scheduleAtFixedRate(timerTask, 1, 1, TimeUnit.MINUTES)
+    val future = timer.scheduleAtFixedRate(timerTask, 1, 1, TimeUnit.MINUTES)
 
     try {
       val abortableRpcFuture = barrierCoordinator.askAbortable[Array[String]](
@@ -120,7 +120,7 @@ class BarrierTaskContext private[spark] (
         logProgressInfo(log"failed to perform global sync", Some(startTime))
         throw e
     } finally {
-      timerTask.cancel()
+      future.cancel(true)
       timer.purge()
     }
   }

Note that there are other cases where this happens ... and simply changing to newDaemonSingleThreadScheduledExecutor only masks the problem - does not fix it (it results in timer tasks continuing to run in the background, and keep growing as more tasks get scheduled).

+CC @jiangxb1987 who wrote the initial impl, and @LuciferYang who merged SPARK-46895 for thoughts

…rCoordinator

It is observed that when using Spark Torch Distributor, Spark driver pod could hang around after calling spark.stop(). Although the Spark Context was shutdown, the JVM was still running.

The reason was that there is a non-daemon timer thread named BarrierCoordinator barrier epoch increment timer, which prevented the driver JVM from stopping. In SPARK-46895 we replaced the timer with non-daemon single thread scheduled executor, but the issue still exists.

We should use daemon single thread scheduled executor instead.
@jshmchenxi jshmchenxi force-pushed the SPARK-49479/use-non-daemon-timer-BarrierCoordinator branch from 43d7636 to afbb58a Compare September 11, 2024 07:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants