From e564a71ba60d13e394f437cb64a975f087142f74 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Mon, 22 Jan 2024 20:46:32 +0100 Subject: [PATCH 01/17] Add failing Supervisor tests --- .../cats/effect/std/SupervisorSpec.scala | 50 ++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala index 0ce590a6a3..ca667e808e 100644 --- a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala @@ -18,10 +18,13 @@ package cats.effect package std import org.specs2.specification.core.Fragments +import org.specs2.matcher.MatchResult + +import cats.syntax.all._ import scala.concurrent.duration._ -class SupervisorSpec extends BaseSpec { +class SupervisorSpec extends BaseSpec with DetectPlatform { "Supervisor" should { "concurrent" >> { @@ -229,5 +232,50 @@ class SupervisorSpec extends BaseSpec { // if this doesn't work properly, the test will hang test.start.flatMap(_.join).as(ok).timeoutTo(2.seconds, IO(false must beTrue)) } + + "supervise / finalize race" in real { + val tsk = IO.uncancelable { poll => + constructor(false, None).allocated.flatMap { + case (supervisor, close) => + supervisor.supervise(IO.never[Unit]).replicateA(100).flatMap { fibers => + IO.ref(false).flatMap { _ => + IO.both(supervisor.supervise(IO.never[Unit]), close).flatMap { + case (fiber, _) => + def joinAndCheck( + fib: Fiber[IO, Throwable, Unit]): IO[MatchResult[Boolean]] = + fib.join.flatMap { oc => IO(oc.isCanceled must beTrue) } + poll(fibers.traverse(joinAndCheck) *> joinAndCheck(fiber)) + } + } + } + } + } + tsk.parReplicateA_(if (isJVM) 1000 else 5).as(ok) + } + + "submit to closed supervisor" in real { + constructor(false, None).use(IO.pure(_)).flatMap { leaked => + leaked.supervise(IO.unit).attempt.flatMap { r => + IO(r must beLeft(beAnInstanceOf[IllegalStateException])) + } + } + } + + "restart / cancel race" in real { + val tsk = constructor(false, Some(_ => true)).use { supervisor => + IO.ref(0).flatMap { counter => + supervisor.supervise(counter.update(_ + 1) *> IO.canceled).flatMap { adaptedFiber => + IO.sleep(100.millis) *> adaptedFiber.cancel *> adaptedFiber.join *> ( + (counter.get, IO.sleep(100.millis) *> counter.get).flatMapN { + case (v1, v2) => + IO((v1: Int) mustEqual (v2: Int)) + } + ) + } + } + } + + tsk.parReplicateA_(if (isJVM) 1000 else 5).as(ok) + } } } From e29eb54b06846fd5ccbc83f48535909c62cb056e Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Tue, 23 Jan 2024 00:10:16 +0100 Subject: [PATCH 02/17] Add ugly test for the bug that was fixed in #1670 --- .../main/scala/cats/effect/std/Supervisor.scala | 3 +++ .../scala/cats/effect/std/SupervisorSpec.scala | 14 ++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala index 91423263a4..aa43a0a2ac 100644 --- a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala +++ b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala @@ -232,6 +232,9 @@ object Supervisor { cleanup = state.remove(token) fiber <- monitor(fa, done.set(true) >> cleanup) _ <- state.add(token, fiber) + // `cleanup` could run *before* the previous line + // (if `fa` is very fast), in which case it doesn't + // remove the fiber from the state, so we re-check: _ <- done.get.ifM(cleanup, F.unit) } yield fiber } diff --git a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala index ca667e808e..bf16b1ecd2 100644 --- a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala @@ -24,8 +24,12 @@ import cats.syntax.all._ import scala.concurrent.duration._ +import java.lang.ref.WeakReference + class SupervisorSpec extends BaseSpec with DetectPlatform { + sequential + "Supervisor" should { "concurrent" >> { supervisorTests(Supervisor.applyForConcurrent) @@ -277,5 +281,15 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { tsk.parReplicateA_(if (isJVM) 1000 else 5).as(ok) } + + "fiber finish / state.add race" in real { // was fixed in #1670 + val tsk = constructor(false, None).use { supervisor => + supervisor.supervise(IO.unit).flatTap(_.joinWithNever).map(new WeakReference(_)).flatMap { fiberWr => + (IO(System.gc()) *> IO.cede).whileM_(IO(fiberWr.get() ne null)).as(ok) + } + } + + tsk.parReplicateA_(if (isJVM) 1000 else 1).as(ok) + } } } From 8690874f110a2b408f89c07b08594482e388e058 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Wed, 24 Jan 2024 01:02:13 +0100 Subject: [PATCH 03/17] Started work on Supervisor itself --- .../scala/cats/effect/std/Supervisor.scala | 148 +++++++++++------- .../cats/effect/std/SupervisorSpec.scala | 41 +++-- 2 files changed, 118 insertions(+), 71 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala index aa43a0a2ac..eaa08a6433 100644 --- a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala +++ b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala @@ -19,6 +19,7 @@ package cats.effect.std import cats.effect.kernel._ import cats.effect.kernel.implicits._ import cats.syntax.all._ +import cats.MonadError import scala.collection.mutable.ListBuffer @@ -140,14 +141,14 @@ object Supervisor { private trait State[F[_]] { def remove(token: Unique.Token): F[Unit] - def add(token: Unique.Token, fiber: Fiber[F, Throwable, _]): F[Unit] + def add(token: Unique.Token, fiber: Fiber[F, Throwable, _]): F[Boolean] // run all the finalizers val joinAll: F[Unit] val cancelAll: F[Unit] } private def supervisor[F[_]]( - mkState: F[State[F]], + mkState: Ref[F, Boolean] => F[State[F]], await: Boolean, checkRestart: Option[Outcome[F, Throwable, _] => Boolean])( implicit F: Concurrent[F]): Resource[F, Supervisor[F]] = { @@ -155,7 +156,7 @@ object Supervisor { // intertwined with resource management for { doneR <- Resource.eval(F.ref(false)) - state <- Resource.makeCase(mkState) { + state <- Resource.makeCase(mkState(doneR)) { case (st, Resource.ExitCase.Succeeded) if await => doneR.set(true) >> st.joinAll case (st, _) => doneR.set(true) >> { /*println("canceling all!");*/ @@ -170,53 +171,64 @@ object Supervisor { case Some(restart) => { (fa, fin) => F.deferred[Outcome[F, Throwable, A]] flatMap { resultR => F.ref(false) flatMap { canceledR => - F.deferred[Ref[F, Fiber[F, Throwable, A]]] flatMap { currentR => - lazy val action: F[Unit] = F uncancelable { _ => - val started = F start { - fa guaranteeCase { oc => - canceledR.get flatMap { canceled => - doneR.get flatMap { done => - if (!canceled && !done && restart(oc)) - action.void - else - fin.guarantee(resultR.complete(oc).void) + F.deferred[Fiber[F, Throwable, A]].flatMap { firstCurrent => + F.ref(firstCurrent).flatMap { currentR => + def action(current: Deferred[F, Fiber[F, Throwable, A]]): F[Unit] = + F uncancelable { _ => + val started = F start { + fa guaranteeCase { oc => + F.deferred[Fiber[F, Throwable, A]].flatMap { newCurrent => + currentR.set(newCurrent) *> { + canceledR.get flatMap { canceled => + doneR.get flatMap { done => + if (!canceled && !done && restart(oc)) + action(newCurrent) + else + newCurrent.complete(null) *> fin.guarantee( + resultR.complete(oc).void) + } + } + } + } } } - } - } - - started flatMap { f => - lazy val loop: F[Unit] = currentR.tryGet flatMap { - case Some(inner) => - inner.set(f) - case None => - F.ref(f) - .flatMap(inner => currentR.complete(inner).ifM(F.unit, loop)) + started flatMap { f => current.complete(f).void } } - loop - } - } - - action map { _ => - new Fiber[F, Throwable, A] { - private[this] val delegateF = currentR.get.flatMap(_.get) - - val cancel: F[Unit] = F uncancelable { _ => - canceledR.set(true) >> delegateF flatMap { fiber => - fiber.cancel >> fiber.join flatMap { - case Outcome.Canceled() => - resultR.complete(Outcome.Canceled()).void - - case _ => - resultR.tryGet.map(_.isDefined).ifM(F.unit, cancel) + action(firstCurrent).as( + new Fiber[F, Throwable, A] { + + private[this] val delegateF = currentR.get.flatMap(_.get) + + val cancel: F[Unit] = F uncancelable { _ => + canceledR.set(true) *> delegateF flatMap { + case null => + resultR.get.void + case fiber => + fiber.cancel *> fiber.join flatMap { + case Outcome.Canceled() => + // double-check: + delegateF.flatMap { + case null => + resultR.get.void + case fiber2 => + if (fiber2 eq fiber) { + fin.guarantee( + resultR.complete(Outcome.Canceled()).void) + } else { + F.raiseError(new AssertionError("unexpected fiber")) + } + } + case _ => + resultR.tryGet.map(_.isDefined).ifM(F.unit, cancel) + } } } - } - val join = resultR.get - } + def join = resultR.get + } + ) } } } @@ -227,64 +239,82 @@ object Supervisor { } for { - done <- F.ref(false) + taskDone <- F.ref(false) + insertDone <- F.deferred[Boolean] token <- F.unique cleanup = state.remove(token) - fiber <- monitor(fa, done.set(true) >> cleanup) - _ <- state.add(token, fiber) - // `cleanup` could run *before* the previous line + fiber <- monitor( + insertDone.get.ifM(fa, F.canceled *> F.never[A]), + taskDone.set(true) >> cleanup + ) + insertSuccessful <- state.add(token, fiber) + _ <- insertDone.complete(insertSuccessful) + // `cleanup` could run *before* the `state.add` // (if `fa` is very fast), in which case it doesn't // remove the fiber from the state, so we re-check: - _ <- done.get.ifM(cleanup, F.unit) + _ <- taskDone.get.ifM(cleanup, F.unit) + _ <- if (!insertSuccessful) raiseClosedError[F] else F.unit } yield fiber } } } + private[this] def raiseClosedError[F[_]](implicit F: MonadError[F, Throwable]): F[Unit] = + F.raiseError(new IllegalStateException("supervisor already shutdown")) + private[effect] def applyForConcurrent[F[_]]( await: Boolean, checkRestart: Option[Outcome[F, Throwable, _] => Boolean])( implicit F: Concurrent[F]): Resource[F, Supervisor[F]] = { val mkState = F.ref[Map[Unique.Token, Fiber[F, Throwable, _]]](Map.empty).map { stateRef => new State[F] { - def remove(token: Unique.Token): F[Unit] = stateRef.update(_ - token) - def add(token: Unique.Token, fiber: Fiber[F, Throwable, _]): F[Unit] = - stateRef.update(_ + (token -> fiber)) + + def remove(token: Unique.Token): F[Unit] = stateRef.update { + case null => null + case map => map.removed(token) + } + + def add(token: Unique.Token, fiber: Fiber[F, Throwable, _]): F[Boolean] = + stateRef.modify { + case null => (null, false) + case map => (map.updated(token, fiber), true) + } private[this] val allFibers: F[List[Fiber[F, Throwable, _]]] = - stateRef.get.map(_.values.toList) + stateRef.getAndSet(null).map(_.values.toList) val joinAll: F[Unit] = allFibers.flatMap(_.traverse_(_.join.void)) + val cancelAll: F[Unit] = allFibers.flatMap(_.parUnorderedTraverse(_.cancel).void) } } - supervisor(mkState, await, checkRestart) + supervisor(_ => mkState, await, checkRestart) } private[effect] def applyForAsync[F[_]]( await: Boolean, checkRestart: Option[Outcome[F, Throwable, _] => Boolean])( implicit F: Async[F]): Resource[F, Supervisor[F]] = { - val mkState = F.delay { + def mkState(doneR: Ref[F, Boolean]) = F.delay { val state = new ConcurrentHashMap[Unique.Token, Fiber[F, Throwable, _]] new State[F] { def remove(token: Unique.Token): F[Unit] = F.delay(state.remove(token)).void - def add(token: Unique.Token, fiber: Fiber[F, Throwable, _]): F[Unit] = - F.delay(state.put(token, fiber)).void + def add(token: Unique.Token, fiber: Fiber[F, Throwable, _]): F[Boolean] = + F.delay(state.put(token, fiber)) *> doneR.get.map(!_) private[this] val allFibers: F[List[Fiber[F, Throwable, _]]] = F delay { - val fibersToCancel = ListBuffer.empty[Fiber[F, Throwable, _]] - fibersToCancel.sizeHint(state.size()) + val fibers = ListBuffer.empty[Fiber[F, Throwable, _]] + fibers.sizeHint(state.size()) val values = state.values().iterator() while (values.hasNext) { - fibersToCancel += values.next() + fibers += values.next() } - fibersToCancel.result() + fibers.result() } val joinAll: F[Unit] = allFibers.flatMap(_.traverse_(_.join.void)) diff --git a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala index bf16b1ecd2..cfbd36bc7b 100644 --- a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala @@ -18,7 +18,6 @@ package cats.effect package std import org.specs2.specification.core.Fragments -import org.specs2.matcher.MatchResult import cats.syntax.all._ @@ -30,6 +29,8 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { sequential + override def executionTimeout = 120.seconds + "Supervisor" should { "concurrent" >> { supervisorTests(Supervisor.applyForConcurrent) @@ -243,18 +244,30 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { case (supervisor, close) => supervisor.supervise(IO.never[Unit]).replicateA(100).flatMap { fibers => IO.ref(false).flatMap { _ => - IO.both(supervisor.supervise(IO.never[Unit]), close).flatMap { - case (fiber, _) => - def joinAndCheck( - fib: Fiber[IO, Throwable, Unit]): IO[MatchResult[Boolean]] = + val tryFork = supervisor.supervise(IO.never[Unit]).map(Some(_)).recover { + case ex: IllegalStateException + if ex.getMessage == "supervisor already shutdown" => + None + } + IO.both(tryFork, close).flatMap { + case (maybeFiber, _) => + def joinAndCheck(fib: Fiber[IO, Throwable, Unit]) = fib.join.flatMap { oc => IO(oc.isCanceled must beTrue) } - poll(fibers.traverse(joinAndCheck) *> joinAndCheck(fiber)) + poll(fibers.traverse(joinAndCheck) *> { + maybeFiber match { + case None => + IO.unit + case Some(fiber) => + // `supervise` won the race, so our fiber must've been cancelled: + joinAndCheck(fiber) + } + }) } } } } } - tsk.parReplicateA_(if (isJVM) 1000 else 5).as(ok) + tsk.parReplicateA_(if (isJVM) 10000 else 5).as(ok) } "submit to closed supervisor" in real { @@ -279,17 +292,21 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { } } - tsk.parReplicateA_(if (isJVM) 1000 else 5).as(ok) + tsk.parReplicateA_(if (isJVM) 10000 else 5).as(ok) } "fiber finish / state.add race" in real { // was fixed in #1670 val tsk = constructor(false, None).use { supervisor => - supervisor.supervise(IO.unit).flatTap(_.joinWithNever).map(new WeakReference(_)).flatMap { fiberWr => - (IO(System.gc()) *> IO.cede).whileM_(IO(fiberWr.get() ne null)).as(ok) - } + supervisor + .supervise(IO.unit) + .flatTap(_.joinWithNever) + .map(new WeakReference(_)) + .flatMap { fiberWr => + (IO(System.gc()) *> IO.sleep(100.millis)).whileM_(IO(fiberWr.get() ne null)).as(ok) + } } - tsk.parReplicateA_(if (isJVM) 1000 else 1).as(ok) + tsk.parReplicateA_(if (isJVM) 2000 else 1).as(ok) } } } From 938fd5dfd5b2869a158fb2b92effa85494349218 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Wed, 24 Jan 2024 01:42:24 +0100 Subject: [PATCH 04/17] Remove unneeded code --- .../cats/effect/std/SupervisorSpec.scala | 38 +++++++++---------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala index cfbd36bc7b..671344a5bf 100644 --- a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala @@ -243,26 +243,24 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { constructor(false, None).allocated.flatMap { case (supervisor, close) => supervisor.supervise(IO.never[Unit]).replicateA(100).flatMap { fibers => - IO.ref(false).flatMap { _ => - val tryFork = supervisor.supervise(IO.never[Unit]).map(Some(_)).recover { - case ex: IllegalStateException - if ex.getMessage == "supervisor already shutdown" => - None - } - IO.both(tryFork, close).flatMap { - case (maybeFiber, _) => - def joinAndCheck(fib: Fiber[IO, Throwable, Unit]) = - fib.join.flatMap { oc => IO(oc.isCanceled must beTrue) } - poll(fibers.traverse(joinAndCheck) *> { - maybeFiber match { - case None => - IO.unit - case Some(fiber) => - // `supervise` won the race, so our fiber must've been cancelled: - joinAndCheck(fiber) - } - }) - } + val tryFork = supervisor.supervise(IO.never[Unit]).map(Some(_)).recover { + case ex: IllegalStateException + if ex.getMessage == "supervisor already shutdown" => + None + } + IO.both(tryFork, close).flatMap { + case (maybeFiber, _) => + def joinAndCheck(fib: Fiber[IO, Throwable, Unit]) = + fib.join.flatMap { oc => IO(oc.isCanceled must beTrue) } + poll(fibers.traverse(joinAndCheck) *> { + maybeFiber match { + case None => + IO.unit + case Some(fiber) => + // `supervise` won the race, so our fiber must've been cancelled: + joinAndCheck(fiber) + } + }) } } } From 0d4f2ba84f405192262d4db53ebd467651eb342e Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Wed, 24 Jan 2024 02:18:05 +0100 Subject: [PATCH 05/17] More Supervisor tests --- .../cats/effect/std/SupervisorSpec.scala | 35 +++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala index 671344a5bf..328fe3217b 100644 --- a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala @@ -238,12 +238,43 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { test.start.flatMap(_.join).as(ok).timeoutTo(2.seconds, IO(false must beTrue)) } + "self-cancel loop" in real { + IO.ref(0L).flatMap { counter => + constructor(true, Some(_ => true)) + .use { supervisor => + val task = counter.update(_ + 1L) *> IO.canceled + supervisor.supervise(task) *> IO.sleep(100.millis) + } + .flatMap { _ => counter.get.flatMap { count => IO(count must beGreaterThan(1L)) } } + } + } + + "lots of simple tasks" in real { + val N = if (isJVM) 10000 else 5 + IO.ref(0L).flatMap { counter => + constructor(true, Some(_ => false)) + .use { supervisor => + val task = counter.update(_ + 1L) + supervisor.supervise(task).parReplicateA(N).void + } + .flatMap { _ => counter.get.flatMap { count => IO(count mustEqual N) } } + } + } + "supervise / finalize race" in real { + superviseFinalizeRace(constructor(false, None), IO.never[Unit]) + } + + "supervise / finalize race with checkRestart" in real { + superviseFinalizeRace(constructor(false, Some(_ => true)), IO.canceled) + } + + def superviseFinalizeRace(mkSupervisor: Resource[IO, Supervisor[IO]], task: IO[Unit]) = { val tsk = IO.uncancelable { poll => - constructor(false, None).allocated.flatMap { + mkSupervisor.allocated.flatMap { case (supervisor, close) => supervisor.supervise(IO.never[Unit]).replicateA(100).flatMap { fibers => - val tryFork = supervisor.supervise(IO.never[Unit]).map(Some(_)).recover { + val tryFork = supervisor.supervise(task).map(Some(_)).recover { case ex: IllegalStateException if ex.getMessage == "supervisor already shutdown" => None From c12623221cd674bf3c06edd762697a26a366ddd4 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Wed, 24 Jan 2024 02:55:32 +0100 Subject: [PATCH 06/17] Refactoring and comments --- .../scala/cats/effect/std/Supervisor.scala | 56 ++++++++++++------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala index eaa08a6433..867ec08401 100644 --- a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala +++ b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala @@ -19,7 +19,6 @@ package cats.effect.std import cats.effect.kernel._ import cats.effect.kernel.implicits._ import cats.syntax.all._ -import cats.MonadError import scala.collection.mutable.ListBuffer @@ -140,9 +139,15 @@ object Supervisor { apply[F](false) private trait State[F[_]] { + def remove(token: Unique.Token): F[Unit] + + /** + * Must return `false` (and may not insert) if `Supervisor` is already closed + */ def add(token: Unique.Token, fiber: Fiber[F, Throwable, _]): F[Boolean] - // run all the finalizers + + // these are allowed to destroy the state, since they're called during closing: val joinAll: F[Unit] val cancelAll: F[Unit] } @@ -152,8 +157,7 @@ object Supervisor { await: Boolean, checkRestart: Option[Outcome[F, Throwable, _] => Boolean])( implicit F: Concurrent[F]): Resource[F, Supervisor[F]] = { - // It would have preferable to use Scope here but explicit cancelation is - // intertwined with resource management + for { doneR <- Resource.eval(F.ref(false)) state <- Resource.makeCase(mkState(doneR)) { @@ -173,7 +177,7 @@ object Supervisor { F.ref(false) flatMap { canceledR => F.deferred[Fiber[F, Throwable, A]].flatMap { firstCurrent => F.ref(firstCurrent).flatMap { currentR => - def action(current: Deferred[F, Fiber[F, Throwable, A]]): F[Unit] = + def action(current: Deferred[F, Fiber[F, Throwable, A]]): F[Unit] = { F uncancelable { _ => val started = F start { fa guaranteeCase { oc => @@ -181,11 +185,12 @@ object Supervisor { currentR.set(newCurrent) *> { canceledR.get flatMap { canceled => doneR.get flatMap { done => - if (!canceled && !done && restart(oc)) + if (!canceled && !done && restart(oc)) { action(newCurrent) - else + } else { newCurrent.complete(null) *> fin.guarantee( resultR.complete(oc).void) + } } } } @@ -195,6 +200,7 @@ object Supervisor { started flatMap { f => current.complete(f).void } } + } action(firstCurrent).as( new Fiber[F, Throwable, A] { @@ -221,7 +227,7 @@ object Supervisor { } } case _ => - resultR.tryGet.map(_.isDefined).ifM(F.unit, cancel) + resultR.get.void } } } @@ -239,29 +245,32 @@ object Supervisor { } for { - taskDone <- F.ref(false) - insertDone <- F.deferred[Boolean] + done <- F.ref(false) + insertResult <- F.deferred[Boolean] token <- F.unique cleanup = state.remove(token) fiber <- monitor( - insertDone.get.ifM(fa, F.canceled *> F.never[A]), - taskDone.set(true) >> cleanup + insertResult.get.ifM(fa, F.canceled *> F.never[A]), + done.set(true) *> cleanup ) - insertSuccessful <- state.add(token, fiber) - _ <- insertDone.complete(insertSuccessful) + insertOk <- state.add(token, fiber) + _ <- insertResult.complete(insertOk) // `cleanup` could run *before* the `state.add` // (if `fa` is very fast), in which case it doesn't // remove the fiber from the state, so we re-check: - _ <- taskDone.get.ifM(cleanup, F.unit) - _ <- if (!insertSuccessful) raiseClosedError[F] else F.unit + _ <- done.get.ifM(cleanup, F.unit) + _ <- { + if (!insertOk) { + F.raiseError(new IllegalStateException("supervisor already shutdown")) + } else { + F.unit + } + } } yield fiber } } } - private[this] def raiseClosedError[F[_]](implicit F: MonadError[F, Throwable]): F[Unit] = - F.raiseError(new IllegalStateException("supervisor already shutdown")) - private[effect] def applyForConcurrent[F[_]]( await: Boolean, checkRestart: Option[Outcome[F, Throwable, _] => Boolean])( @@ -302,8 +311,15 @@ object Supervisor { def remove(token: Unique.Token): F[Unit] = F.delay(state.remove(token)).void - def add(token: Unique.Token, fiber: Fiber[F, Throwable, _]): F[Boolean] = + def add(token: Unique.Token, fiber: Fiber[F, Throwable, _]): F[Boolean] = { + // We might insert a fiber even when closed, but + // then we return `false`, so it will not actually + // execute its task, but will self-cancel. In this + // case we need not remove the (cancelled) fiber + // from the map, since the whole `Supervisor` is + // shutting down anyway. F.delay(state.put(token, fiber)) *> doneR.get.map(!_) + } private[this] val allFibers: F[List[Fiber[F, Throwable, _]]] = F delay { From 83d11ccc8b992048e0427cdc0f0d05510f13cccf Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Fri, 26 Jan 2024 00:37:08 +0100 Subject: [PATCH 07/17] Comments and cleanup --- .../scala/cats/effect/std/Supervisor.scala | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala index 867ec08401..4f1cce95bd 100644 --- a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala +++ b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala @@ -26,17 +26,17 @@ import java.util.concurrent.ConcurrentHashMap /** * A fiber-based supervisor that monitors the lifecycle of all fibers that are started via its - * interface. The supervisor is managed by a singular fiber to which the lifecycles of all - * spawned fibers are bound. + * interface. The lifecycles of all these spawned fibers are bound to the lifecycle of the + * [[Supervisor]] itself. * * Whereas [[cats.effect.kernel.GenSpawn.background]] links the lifecycle of the spawned fiber * to the calling fiber, starting a fiber via a [[Supervisor]] links the lifecycle of the * spawned fiber to the supervisor fiber. This is useful when the scope of some fiber must * survive the spawner, but should still be confined within some "larger" scope. * - * The fibers started via the supervisor are guaranteed to be terminated when the supervisor - * fiber is terminated. When a supervisor fiber is canceled, all active and queued fibers will - * be safely finalized before finalization of the supervisor is complete. + * The fibers started via the supervisor are guaranteed to be terminated when the supervisor is + * terminated. When a supervisor is finalized, all active and queued fibers will be safely + * finalized before finalization of the supervisor is complete. * * The following diagrams illustrate the lifecycle of a fiber spawned via * [[cats.effect.kernel.GenSpawn.start]], [[cats.effect.kernel.GenSpawn.background]], and @@ -95,6 +95,9 @@ trait Supervisor[F[_]] { /** * Starts the supplied effect `fa` on the supervisor. * + * Trying to start an effect with this method on an already finalized supervisor results in an + * error (inside `F`). + * * @return * a [[cats.effect.kernel.Fiber]] that represents a handle to the started fiber. */ @@ -138,34 +141,33 @@ object Supervisor { def apply[F[_]: Concurrent]: Resource[F, Supervisor[F]] = apply[F](false) - private trait State[F[_]] { + private sealed abstract class State[F[_]] { def remove(token: Unique.Token): F[Unit] /** - * Must return `false` (and may not insert) if `Supervisor` is already closed + * Must return `false` (and might not insert) if `Supervisor` is already closed */ def add(token: Unique.Token, fiber: Fiber[F, Throwable, _]): F[Boolean] - // these are allowed to destroy the state, since they're called during closing: + // these are allowed to destroy the state, since they're only called during closing: val joinAll: F[Unit] val cancelAll: F[Unit] } private def supervisor[F[_]]( - mkState: Ref[F, Boolean] => F[State[F]], + mkState: Ref[F, Boolean] => F[State[F]], // receives the main shutdown flag await: Boolean, - checkRestart: Option[Outcome[F, Throwable, _] => Boolean])( + checkRestart: Option[Outcome[F, Throwable, _] => Boolean])( // `None` never restarts implicit F: Concurrent[F]): Resource[F, Supervisor[F]] = { for { doneR <- Resource.eval(F.ref(false)) state <- Resource.makeCase(mkState(doneR)) { - case (st, Resource.ExitCase.Succeeded) if await => doneR.set(true) >> st.joinAll + case (st, Resource.ExitCase.Succeeded) if await => + doneR.set(true) *> st.joinAll case (st, _) => - doneR.set(true) >> { /*println("canceling all!");*/ - st.cancelAll - } + doneR.set(true) *> st.cancelAll } } yield new Supervisor[F] { @@ -223,6 +225,7 @@ object Supervisor { fin.guarantee( resultR.complete(Outcome.Canceled()).void) } else { + // this should never happen F.raiseError(new AssertionError("unexpected fiber")) } } From c976b401b08c9562a4356dff4161e432e9da5699 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Fri, 26 Jan 2024 01:05:32 +0100 Subject: [PATCH 08/17] More comments, refactoring --- .../scala/cats/effect/std/Supervisor.scala | 44 ++++++++++++++++--- 1 file changed, 39 insertions(+), 5 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala index 4f1cce95bd..2e4a837f0a 100644 --- a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala +++ b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala @@ -178,18 +178,30 @@ object Supervisor { F.deferred[Outcome[F, Throwable, A]] flatMap { resultR => F.ref(false) flatMap { canceledR => F.deferred[Fiber[F, Throwable, A]].flatMap { firstCurrent => + // `currentR` holds (a `Deferred` to) the current + // incarnation of the fiber executing `fa`: F.ref(firstCurrent).flatMap { currentR => def action(current: Deferred[F, Fiber[F, Throwable, A]]): F[Unit] = { F uncancelable { _ => val started = F start { fa guaranteeCase { oc => F.deferred[Fiber[F, Throwable, A]].flatMap { newCurrent => + // we're replacing the `Deferred` holding + // the current fiber with a new one BEFORE + // the current fiber finishes; crucially, + // this means that the fiber reachable + // through `currentR` can be (is) "early" + // but it's never "late": currentR.set(newCurrent) *> { canceledR.get flatMap { canceled => doneR.get flatMap { done => if (!canceled && !done && restart(oc)) { action(newCurrent) } else { + // we must complete `newCurrent`, + // because `cancel` below may wait + // for it; we signal that it is not + // restarted with `null`: newCurrent.complete(null) *> fin.guarantee( resultR.complete(oc).void) } @@ -212,24 +224,42 @@ object Supervisor { val cancel: F[Unit] = F uncancelable { _ => canceledR.set(true) *> delegateF flatMap { case null => + // ok, task wasn't restarted, but we + // wait for the result to be completed: resultR.get.void case fiber => + // a fiber is executing our task, + // cancel it, and wait for it: fiber.cancel *> fiber.join flatMap { case Outcome.Canceled() => - // double-check: + // cancel successful (or self-canceled), + // but we don't know if the `guaranteeCase` + // above ran so we need to double check: delegateF.flatMap { case null => + // ok, the `guaranteeCase` + // certainly executed/ing: resultR.get.void case fiber2 => + // we cancelled the fiber before it did + // anything, so the finalizer didn't run, + // we need to do it now: + val cleanup = fin.guarantee( + resultR.complete(Outcome.Canceled()).void + ) + // just to be sure: if (fiber2 eq fiber) { - fin.guarantee( - resultR.complete(Outcome.Canceled()).void) + cleanup } else { // this should never happen - F.raiseError(new AssertionError("unexpected fiber")) + cleanup *> F.raiseError( + new AssertionError("unexpected fiber")) } } case _ => + // finished in error/success, + // the outcome will certainly + // be completed: resultR.get.void } } @@ -253,12 +283,16 @@ object Supervisor { token <- F.unique cleanup = state.remove(token) fiber <- monitor( + // if the supervisor have been (or is now) + // shutting down, inserting into state will + // fail; so we need to wait for the result + // of inserting before actually doing the task: insertResult.get.ifM(fa, F.canceled *> F.never[A]), done.set(true) *> cleanup ) insertOk <- state.add(token, fiber) _ <- insertResult.complete(insertOk) - // `cleanup` could run *before* the `state.add` + // `cleanup` could run BEFORE the `state.add` // (if `fa` is very fast), in which case it doesn't // remove the fiber from the state, so we re-check: _ <- done.get.ifM(cleanup, F.unit) From ea178f9e9917a794513f1cab286b5e98bd3e72a4 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Fri, 26 Jan 2024 01:40:55 +0100 Subject: [PATCH 09/17] Removed slow and ugly test --- build.sbt | 3 ++- .../scala/cats/effect/std/Supervisor.scala | 4 ++-- .../cats/effect/std/SupervisorSpec.scala | 20 ++----------------- 3 files changed, 6 insertions(+), 21 deletions(-) diff --git a/build.sbt b/build.sbt index 828a8de112..e829482021 100644 --- a/build.sbt +++ b/build.sbt @@ -895,7 +895,8 @@ lazy val tests: CrossProject = crossProject(JSPlatform, JVMPlatform, NativePlatf ) .jvmSettings( Test / fork := true, - Test / javaOptions += s"-Dsbt.classpath=${(Test / fullClasspath).value.map(_.data.getAbsolutePath).mkString(File.pathSeparator)}" + Test / javaOptions += s"-Dsbt.classpath=${(Test / fullClasspath).value.map(_.data.getAbsolutePath).mkString(File.pathSeparator)}", + // Test / javaOptions += "-XX:ActiveProcessorCount=2", ) lazy val testsJS = tests.js diff --git a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala index 2e4a837f0a..99d22ce4ff 100644 --- a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala +++ b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala @@ -285,8 +285,8 @@ object Supervisor { fiber <- monitor( // if the supervisor have been (or is now) // shutting down, inserting into state will - // fail; so we need to wait for the result - // of inserting before actually doing the task: + // fail; so we need to wait for the positive result + // of inserting, before actually doing the task: insertResult.get.ifM(fa, F.canceled *> F.never[A]), done.set(true) *> cleanup ) diff --git a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala index 328fe3217b..20f519506a 100644 --- a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala @@ -23,13 +23,11 @@ import cats.syntax.all._ import scala.concurrent.duration._ -import java.lang.ref.WeakReference - class SupervisorSpec extends BaseSpec with DetectPlatform { sequential - override def executionTimeout = 120.seconds + override def executionTimeout = 30.seconds "Supervisor" should { "concurrent" >> { @@ -296,7 +294,7 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { } } } - tsk.parReplicateA_(if (isJVM) 10000 else 5).as(ok) + tsk.parReplicateA_(if (isJVM) 5000 else 5).as(ok) } "submit to closed supervisor" in real { @@ -323,19 +321,5 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { tsk.parReplicateA_(if (isJVM) 10000 else 5).as(ok) } - - "fiber finish / state.add race" in real { // was fixed in #1670 - val tsk = constructor(false, None).use { supervisor => - supervisor - .supervise(IO.unit) - .flatTap(_.joinWithNever) - .map(new WeakReference(_)) - .flatMap { fiberWr => - (IO(System.gc()) *> IO.sleep(100.millis)).whileM_(IO(fiberWr.get() ne null)).as(ok) - } - } - - tsk.parReplicateA_(if (isJVM) 2000 else 1).as(ok) - } } } From 2fe6225d1142ab5fc7d416cc5b609e36ace43eea Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Fri, 26 Jan 2024 01:41:38 +0100 Subject: [PATCH 10/17] Removed silly tests --- .../cats/effect/std/SupervisorSpec.scala | 23 ------------------- 1 file changed, 23 deletions(-) diff --git a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala index 20f519506a..9a211e2a7c 100644 --- a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala @@ -236,29 +236,6 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { test.start.flatMap(_.join).as(ok).timeoutTo(2.seconds, IO(false must beTrue)) } - "self-cancel loop" in real { - IO.ref(0L).flatMap { counter => - constructor(true, Some(_ => true)) - .use { supervisor => - val task = counter.update(_ + 1L) *> IO.canceled - supervisor.supervise(task) *> IO.sleep(100.millis) - } - .flatMap { _ => counter.get.flatMap { count => IO(count must beGreaterThan(1L)) } } - } - } - - "lots of simple tasks" in real { - val N = if (isJVM) 10000 else 5 - IO.ref(0L).flatMap { counter => - constructor(true, Some(_ => false)) - .use { supervisor => - val task = counter.update(_ + 1L) - supervisor.supervise(task).parReplicateA(N).void - } - .flatMap { _ => counter.get.flatMap { count => IO(count mustEqual N) } } - } - } - "supervise / finalize race" in real { superviseFinalizeRace(constructor(false, None), IO.never[Unit]) } From b471bcd97761066ce2ad8ff4aac5f6bcb47f1d5d Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Fri, 26 Jan 2024 02:16:53 +0100 Subject: [PATCH 11/17] Tweak comments --- .../scala/cats/effect/std/Supervisor.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala index 99d22ce4ff..e794be65bb 100644 --- a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala +++ b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala @@ -187,11 +187,12 @@ object Supervisor { fa guaranteeCase { oc => F.deferred[Fiber[F, Throwable, A]].flatMap { newCurrent => // we're replacing the `Deferred` holding - // the current fiber with a new one BEFORE - // the current fiber finishes; crucially, - // this means that the fiber reachable - // through `currentR` can be (is) "early" - // but it's never "late": + // the current fiber with a new one before + // the current fiber finishes, and even + // before we check for the cancel signal; + // this guarantees, that the fiber reachable + // through `currentR` is the last one (or + // null, see below): currentR.set(newCurrent) *> { canceledR.get flatMap { canceled => doneR.get flatMap { done => @@ -222,14 +223,17 @@ object Supervisor { private[this] val delegateF = currentR.get.flatMap(_.get) val cancel: F[Unit] = F uncancelable { _ => + // after setting `canceledR`, at + // most one restart happens, and + // the fiber we get through `delegateF` + // is the final one: canceledR.set(true) *> delegateF flatMap { case null => // ok, task wasn't restarted, but we - // wait for the result to be completed: + // wait for the result to be completed + // (and the finalizer to run): resultR.get.void case fiber => - // a fiber is executing our task, - // cancel it, and wait for it: fiber.cancel *> fiber.join flatMap { case Outcome.Canceled() => // cancel successful (or self-canceled), @@ -247,7 +251,6 @@ object Supervisor { val cleanup = fin.guarantee( resultR.complete(Outcome.Canceled()).void ) - // just to be sure: if (fiber2 eq fiber) { cleanup } else { From 255cbb1fb91546c591b732de72c5e8a059ece04a Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Fri, 26 Jan 2024 02:40:57 +0100 Subject: [PATCH 12/17] Fix for Scala 2.12 --- std/shared/src/main/scala/cats/effect/std/Supervisor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala index e794be65bb..12a2152e69 100644 --- a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala +++ b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala @@ -320,7 +320,7 @@ object Supervisor { def remove(token: Unique.Token): F[Unit] = stateRef.update { case null => null - case map => map.removed(token) + case map => map - token } def add(token: Unique.Token, fiber: Fiber[F, Throwable, _]): F[Boolean] = From 816a7e31b6a6e5a4a31089e0b9047579233fc426 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Fri, 26 Jan 2024 02:56:29 +0100 Subject: [PATCH 13/17] scalafix --- .../src/test/scala/cats/effect/std/SupervisorSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala index 9a211e2a7c..8f11a1ea37 100644 --- a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala @@ -17,10 +17,10 @@ package cats.effect package std -import org.specs2.specification.core.Fragments - import cats.syntax.all._ +import org.specs2.specification.core.Fragments + import scala.concurrent.duration._ class SupervisorSpec extends BaseSpec with DetectPlatform { From c228bf1a12705cb7193b6b6c194f863963edd6ca Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Fri, 26 Jan 2024 03:23:46 +0100 Subject: [PATCH 14/17] mima --- build.sbt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index e829482021..133148f727 100644 --- a/build.sbt +++ b/build.sbt @@ -982,7 +982,9 @@ lazy val std = crossProject(JSPlatform, JVMPlatform, NativePlatform) "cats.effect.std.Queue$UnsafeUnbounded$Cell"), // introduced by #3480 // adds method to sealed Hotswap - ProblemFilters.exclude[ReversedMissingMethodProblem]("cats.effect.std.Hotswap.get") + ProblemFilters.exclude[ReversedMissingMethodProblem]("cats.effect.std.Hotswap.get"), + // #3972, private trait + ProblemFilters.exclude[IncompatibleTemplateDefProblem]("cats.effect.std.Supervisor$State"), ) ) .jsSettings( From 5cb14127c420d05862e31e676b723cc7611090c9 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Fri, 26 Jan 2024 04:05:26 +0100 Subject: [PATCH 15/17] CI needs bigger timeout --- .../shared/src/test/scala/cats/effect/std/SupervisorSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala index 8f11a1ea37..35e0e5055f 100644 --- a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala @@ -27,7 +27,7 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { sequential - override def executionTimeout = 30.seconds + override def executionTimeout = 60.seconds "Supervisor" should { "concurrent" >> { From 7b69da179a8064e15d2543269f8dc2ca3f333a23 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Sat, 27 Jan 2024 12:22:16 +0100 Subject: [PATCH 16/17] Address review suggestions --- .../scala/cats/effect/std/Supervisor.scala | 6 +++++- .../scala/cats/effect/std/SupervisorSpec.scala | 18 +++++++----------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala index 12a2152e69..a98cf6ec61 100644 --- a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala +++ b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala @@ -329,8 +329,12 @@ object Supervisor { case map => (map.updated(token, fiber), true) } - private[this] val allFibers: F[List[Fiber[F, Throwable, _]]] = + private[this] val allFibers: F[List[Fiber[F, Throwable, _]]] = { + // we're closing, so we won't need the state any more, + // so we're using `null` as a sentinel to reject later + // insertions in `add`: stateRef.getAndSet(null).map(_.values.toList) + } val joinAll: F[Unit] = allFibers.flatMap(_.traverse_(_.join.void)) diff --git a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala index 35e0e5055f..6e1fc9543f 100644 --- a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala @@ -25,10 +25,6 @@ import scala.concurrent.duration._ class SupervisorSpec extends BaseSpec with DetectPlatform { - sequential - - override def executionTimeout = 60.seconds - "Supervisor" should { "concurrent" >> { supervisorTests(Supervisor.applyForConcurrent) @@ -219,7 +215,7 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { } // if this doesn't work properly, the test will hang - test.start.flatMap(_.join).as(ok).timeoutTo(2.seconds, IO(false must beTrue)) + test.start.flatMap(_.join).as(ok).timeoutTo(3.seconds, IO(false must beTrue)) } "cancel inner fiber and ignore restart if outer errored" in real { @@ -233,7 +229,7 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { } // if this doesn't work properly, the test will hang - test.start.flatMap(_.join).as(ok).timeoutTo(2.seconds, IO(false must beTrue)) + test.start.flatMap(_.join).as(ok).timeoutTo(3.seconds, IO(false must beTrue)) } "supervise / finalize race" in real { @@ -250,8 +246,8 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { case (supervisor, close) => supervisor.supervise(IO.never[Unit]).replicateA(100).flatMap { fibers => val tryFork = supervisor.supervise(task).map(Some(_)).recover { - case ex: IllegalStateException - if ex.getMessage == "supervisor already shutdown" => + case ex: IllegalStateException => + ex.getMessage mustEqual "supervisor already shutdown" None } IO.both(tryFork, close).flatMap { @@ -271,7 +267,7 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { } } } - tsk.parReplicateA_(if (isJVM) 5000 else 5).as(ok) + tsk.parReplicateA_(if (isJVM) 1000 else 1).as(ok) } "submit to closed supervisor" in real { @@ -289,14 +285,14 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { IO.sleep(100.millis) *> adaptedFiber.cancel *> adaptedFiber.join *> ( (counter.get, IO.sleep(100.millis) *> counter.get).flatMapN { case (v1, v2) => - IO((v1: Int) mustEqual (v2: Int)) + IO(v1 mustEqual v2) } ) } } } - tsk.parReplicateA_(if (isJVM) 10000 else 5).as(ok) + tsk.parReplicateA_(if (isJVM) 1000 else 1).as(ok) } } } From 35b19bca0ba3db10f46f7366accf3585ec047bb1 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Sat, 27 Jan 2024 12:28:03 +0100 Subject: [PATCH 17/17] Make clearer assertions about things that should never happen --- .../src/main/scala/cats/effect/std/Supervisor.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala index a98cf6ec61..cc230c7992 100644 --- a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala +++ b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala @@ -255,8 +255,8 @@ object Supervisor { cleanup } else { // this should never happen - cleanup *> F.raiseError( - new AssertionError("unexpected fiber")) + cleanup *> F.raiseError(new AssertionError( + "unexpected fiber (this is a bug in Supervisor)")) } } case _ => @@ -290,7 +290,13 @@ object Supervisor { // shutting down, inserting into state will // fail; so we need to wait for the positive result // of inserting, before actually doing the task: - insertResult.get.ifM(fa, F.canceled *> F.never[A]), + insertResult + .get + .ifM( + fa, + F.canceled *> F.raiseError[A](new AssertionError( + "supervised fiber couldn't cancel (this is a bug in Supervisor)")) + ), done.set(true) *> cleanup ) insertOk <- state.add(token, fiber)