Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #4066: shut down executors when IORuntime.global shuts down #4067

Merged
merged 3 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,15 @@ private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type
def global: IORuntime = {
if (_global == null) {
installGlobal {
val (compute, _) = createWorkStealingComputeThreadPool()
val (blocking, _) = createDefaultBlockingExecutionContext()
val (compute, computeDown) = createWorkStealingComputeThreadPool()
val (blocking, blockingDown) = createDefaultBlockingExecutionContext()
val shutdown = () => {
computeDown()
blockingDown()
resetGlobal()
}

IORuntime(compute, blocking, compute, () => resetGlobal(), IORuntimeConfig())
IORuntime(compute, blocking, compute, shutdown, IORuntimeConfig())
}
}

Expand Down
7 changes: 7 additions & 0 deletions tests/jvm/src/test/scala/cats/effect/IOAppSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,13 @@ class IOAppSpec extends Specification {
ok
}

"shut down WSTP on fatal error without IOApp" in {
val h = platform(FatalErrorShutsDownRt, List.empty)
h.awaitStatus()
h.stdout() must not(contain("sadness"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That wasn't me, I've copied it from another test 😄

h.stdout() must contain("done")
}

"support main thread evaluation" in {
val h = platform(EvalOnMainThread, List.empty)
h.awaitStatus() mustEqual 0
Expand Down
33 changes: 33 additions & 0 deletions tests/shared/src/main/scala/catseffect/examples.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import cats.effect.std.{Console, Random}
import cats.effect.unsafe.{IORuntime, IORuntimeConfig, Scheduler}
import cats.syntax.all._

import scala.concurrent.Await
import scala.concurrent.duration._

package examples {

import java.util.concurrent.TimeoutException

object HelloWorld extends IOApp.Simple {
def run: IO[Unit] =
IO(println("Hello, World!"))
Expand Down Expand Up @@ -58,6 +61,36 @@ package examples {
}
}

object FatalErrorShutsDownRt extends RawApp {
def main(args: Array[String]): Unit = {
val rt = cats.effect.unsafe.IORuntime.global
@volatile var thread: Thread = null
val action = for {
// make sure a blocking thread exists, save it:
_ <- IO.blocking {
thread = Thread.currentThread()
}
// get back on the WSTP:
_ <- IO.cede
// fatal error on the WSTP thread:
_ <- IO {
throw new OutOfMemoryError("Boom!")
}.attempt.flatMap(_ => IO.println("sadness (attempt)"))
} yield ()
val fut = action.unsafeToFuture()(rt)
try {
Await.ready(fut, atMost = 2.seconds)
} catch {
case _: TimeoutException => println("sadness (timeout)")
}
Thread.sleep(500L)
// by now the WSTP (and all its threads) must've been shut down:
if (thread eq null) println("sadness (thread is null)")
else if (thread.isAlive()) println("sadness (thread is alive)")
println("done")
}
}

object RaiseFatalErrorAttempt extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
IO.raiseError[Unit](new OutOfMemoryError("Boom!"))
Expand Down
Loading