From 718c78ef53c76981e6c01fc59ec19ac490cf0a01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Abecasis?= Date: Fri, 19 Apr 2024 01:45:23 +0100 Subject: [PATCH] Make `timeout`/`timeoutTo` always return the outcome of the effect `timeout*` methods are implemented in terms of a race between a desired effect and the timeout. In the case that both effects complete simultaneously, it could happen that the timeout would win the race, a `TimeoutException` be raised, and the outcome of the desired effect lost. As is noted in #3456, this is a general problem with the `race*` methods, and can't be addressed in the general case without breaking the current interfaces. This change is a more narrow take on the problem specifically focusing on the `timeout` and `timeoutTo` methods. As these methods inherently wait for both racing effects to complete, the implementation is changed to always take into account the outcome of the desired effect, only raising a `TimeoutException` if the timeout won the race *and* the desired effect was effectively canceled. Similarly, errors from the desired effect are preferentially propagated over the generic `TimeoutException`. The `timeoutAndForget` methods are left unchanged, as they explicitly avoid waiting for the losing effect to finish. This change allows for `timeout` and `timeoutTo` methods to be safely used on effects that acquire resources, such as `Semaphore.acquire`, ensuring that successful outcomes are always propagated back to the user. --- .../src/main/scala/cats/effect/IO.scala | 57 ++++++++---- .../cats/effect/kernel/GenTemporal.scala | 70 ++++++++++----- .../cats/effect/laws/GenTemporalSpec.scala | 89 +++++++++++++++---- .../src/test/scala/cats/effect/IOSpec.scala | 17 ++++ 4 files changed, 177 insertions(+), 56 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 7138a0b915..8f74ae0871 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -779,17 +779,27 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] { andWait(duration: Duration) /** - * Returns an IO that either completes with the result of the source within the specified time - * `duration` or otherwise raises a `TimeoutException`. + * Returns an IO that either completes with the result of the source or otherwise raises a + * `TimeoutException`. * - * The source is canceled in the event that it takes longer than the specified time duration - * to complete. Once the source has been successfully canceled (and has completed its - * finalizers), the `TimeoutException` will be raised. If the source is uncancelable, the - * resulting effect will wait for it to complete before raising the exception. + * The source is raced against the timeout `duration`, and its cancelation is triggered if the + * source doesn't complete within the specified time. The resulting effect will always wait + * for the source effect to complete (and to complete its finalizers), and will return the + * source's outcome over raising a `TimeoutException`. + * + * In case source and timeout complete simultaneously, the result of the source will be + * returned over raising a `TimeoutException`. + * + * If the source effect is uncancelable, a `TimeoutException` will never be raised. * * @param duration - * is the time span for which we wait for the source to complete; in the event that the - * specified time has passed without the source completing, a `TimeoutException` is raised + * is the time span for which we wait for the source to complete before triggering its + * cancelation; in the event that the specified time has passed without the source + * completing, a `TimeoutException` is raised + * + * @see + * [[timeoutAndForget]] for a variant which does not wait for cancelation of the source + * effect to complete. */ def timeout[A2 >: A](duration: Duration): IO[A2] = handleDuration(duration, this) { finiteDuration => @@ -802,26 +812,35 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] { timeout(duration: Duration) /** - * Returns an IO that either completes with the result of the source within the specified time - * `duration` or otherwise evaluates the `fallback`. + * Returns an IO that either completes with the result of the source or otherwise evaluates + * the `fallback`. * - * The source is canceled in the event that it takes longer than the specified time duration - * to complete. Once the source has been successfully canceled (and has completed its - * finalizers), the fallback will be sequenced. If the source is uncancelable, the resulting - * effect will wait for it to complete before evaluating the fallback. + * The source is raised against the timeout `duration`, and its cancelation is triggered if + * the source doesn't complete within the specified time. The resulting effect will always + * wait for the source effect to complete (and to complete its finalizers), and will return + * the source's outcome over sequencing the `fallback`. + * + * In case source and timeout complete simultaneously, the result of the source will be + * returned over sequencing the `fallback`. + * + * If the source in uncancelable, `fallback` will never be evaluated. * * @param duration - * is the time span for which we wait for the source to complete; in the event that the - * specified time has passed without the source completing, the `fallback` gets evaluated + * is the time span for which we wait for the source to complete before triggering its + * cancelation; in the event that the specified time has passed without the source + * completing, the `fallback` gets evaluated * * @param fallback * is the task evaluated after the duration has passed and the source canceled */ def timeoutTo[A2 >: A](duration: Duration, fallback: IO[A2]): IO[A2] = { handleDuration[IO[A2]](duration, this) { finiteDuration => - race(IO.sleep(finiteDuration)).flatMap { - case Right(_) => fallback - case Left(value) => IO.pure(value) + IO.uncancelable { poll => + poll(racePair(IO.sleep(finiteDuration))) flatMap { + case Left((oc, f)) => f.cancel *> oc.embed(poll(IO.canceled) *> IO.never) + case Right((f, _)) => + f.cancel *> f.join.flatMap { oc => oc.fold(fallback, IO.raiseError, identity) } + } } } } diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/GenTemporal.scala b/kernel/shared/src/main/scala/cats/effect/kernel/GenTemporal.scala index 56d6d0d270..3b572c3bcd 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/GenTemporal.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/GenTemporal.scala @@ -72,17 +72,23 @@ trait GenTemporal[F[_], E] extends GenConcurrent[F, E] with Clock[F] { productL(fa)(sleep(time)) /** - * Returns an effect that either completes with the result of the source within the specified - * time `duration` or otherwise evaluates the `fallback`. + * Returns an effect that either completes with the result of the source or otherwise + * evaluates the `fallback`. * - * The source is canceled in the event that it takes longer than the specified time duration - * to complete. Once the source has been successfully canceled (and has completed its - * finalizers), the fallback will be sequenced. If the source is uncancelable, the resulting - * effect will wait for it to complete before evaluating the fallback. + * The source is raised against the timeout `duration`, and its cancelation is triggered if + * the source doesn't complete within the specified time. The resulting effect will always + * wait for the source effect to complete (and to complete its finalizers), and will return + * the source's outcome over sequencing the `fallback`. + * + * In case source and timeout complete simultaneously, the result of the source will be + * returned over sequencing the `fallback`. + * + * If the source in uncancelable, `fallback` will never be evaluated. * * @param duration - * The time span for which we wait for the source to complete; in the event that the - * specified time has passed without the source completing, the `fallback` gets evaluated + * The time span for which we wait for the source to complete before triggering its + * cancelation; in the event that the specified time has passed without the source + * completing, the `fallback` gets evaluated * * @param fallback * The task evaluated after the duration has passed and the source canceled @@ -91,23 +97,36 @@ trait GenTemporal[F[_], E] extends GenConcurrent[F, E] with Clock[F] { handleDuration(duration, fa)(timeoutTo(fa, _, fallback)) protected def timeoutTo[A](fa: F[A], duration: FiniteDuration, fallback: F[A]): F[A] = - flatMap(race(fa, sleep(duration))) { - case Left(a) => pure(a) - case Right(_) => fallback + uncancelable { poll => + implicit val F: GenTemporal[F, E] = this + + poll(racePair(fa, sleep(duration))) flatMap { + case Left((oc, f)) => f.cancel *> oc.embed(poll(F.canceled) *> F.never) + case Right((f, _)) => f.cancel *> f.join.flatMap { oc => oc.embed(fallback) } + } } /** - * Returns an effect that either completes with the result of the source within the specified - * time `duration` or otherwise raises a `TimeoutException`. + * Returns an effect that either completes with the result of the source or raises a + * `TimeoutException`. * - * The source is canceled in the event that it takes longer than the specified time duration - * to complete. Once the source has been successfully canceled (and has completed its - * finalizers), the `TimeoutException` will be raised. If the source is uncancelable, the - * resulting effect will wait for it to complete before raising the exception. + * The source is raced against the timeout `duration`, and its cancelation is triggered if the + * source doesn't complete within the specified time. The resulting effect will always wait + * for the source effect to complete (and to complete its finalizers), and will return the + * source's outcome over raising a `TimeoutException`. + * + * In case source and timeout complete simultaneously, the result of the source will be + * returned over raising a `TimeoutException`. + * + * If the source effect is uncancelable, a `TimeoutException` will never be raised. * * @param duration - * The time span for which we wait for the source to complete; in the event that the - * specified time has passed without the source completing, a `TimeoutException` is raised + * The time span for which we wait for the source to complete before triggering its + * cancelation; in the event that the specified time has passed without the source + * completing, a `TimeoutException` is raised + * @see + * [[timeoutAndForget[A](fa:F[A],duration:scala\.concurrent\.duration\.Duration)* timeoutAndForget]] + * for a variant which does not wait for cancelation of the source effect to complete. */ def timeout[A](fa: F[A], duration: Duration)(implicit ev: TimeoutException <:< E): F[A] = { handleDuration(duration, fa)(timeout(fa, _)) @@ -115,9 +134,16 @@ trait GenTemporal[F[_], E] extends GenConcurrent[F, E] with Clock[F] { protected def timeout[A](fa: F[A], duration: FiniteDuration)( implicit ev: TimeoutException <:< E): F[A] = { - flatMap(race(fa, sleep(duration))) { - case Left(a) => pure(a) - case Right(_) => raiseError[A](ev(new TimeoutException(duration.toString()))) + uncancelable { poll => + implicit val F: GenTemporal[F, E] = this + + poll(racePair(fa, sleep(duration))) flatMap { + case Left((oc, f)) => f.cancel *> oc.embed(poll(F.canceled) *> F.never) + case Right((f, _)) => + f.cancel *> f.join.flatMap { oc => + oc.embed(raiseError[A](ev(new TimeoutException(duration.toString())))) + } + } } } diff --git a/laws/shared/src/test/scala/cats/effect/laws/GenTemporalSpec.scala b/laws/shared/src/test/scala/cats/effect/laws/GenTemporalSpec.scala index fa5e6c15d0..7a268de612 100644 --- a/laws/shared/src/test/scala/cats/effect/laws/GenTemporalSpec.scala +++ b/laws/shared/src/test/scala/cats/effect/laws/GenTemporalSpec.scala @@ -18,15 +18,15 @@ package cats package effect package laws -import cats.effect.kernel.Temporal +import cats.effect.kernel.{Outcome, Temporal} import cats.effect.kernel.testkit.TimeT import cats.effect.kernel.testkit.pure._ import cats.syntax.all._ import org.specs2.mutable.Specification +import scala.concurrent.TimeoutException import scala.concurrent.duration._ -// import scala.concurrent.TimeoutException class GenTemporalSpec extends Specification { outer => @@ -43,6 +43,40 @@ class GenTemporalSpec extends Specification { outer => val fa = F.pure(true) F.timeout(fa, Duration.Inf) mustEqual fa } + + "succeed on a fast action" in { + val fa: TimeT[F, Boolean] = F.pure(true) + val op = F.timeout(fa, Duration.Zero) + + run(TimeT.run(op)) mustEqual Outcome.Succeeded(Some(true)) + } + + "error out on a slow action" in { + val fa: TimeT[F, Boolean] = F.never *> F.pure(true) + val op = F.timeout(fa, Duration.Zero) + + run(TimeT.run(op)) must beLike { + case Outcome.Errored(e) => e must haveClass[TimeoutException] + } + } + + "propagate successful outcome of uncancelable action" in { + val fa = F.uncancelable(_ => F.sleep(50.millis) *> F.pure(true)) + val op = F.timeout(fa, Duration.Zero) + + run(TimeT.run(op)) mustEqual Outcome.Succeeded(Some(true)) + } + + "propagate errors from uncancelable action" in { + val fa = F.uncancelable { _ => + F.sleep(50.millis) *> F.raiseError(new RuntimeException("fa failed")) *> F.pure(true) + } + val op = F.timeout(fa, Duration.Zero) + + run(TimeT.run(op)) must beLike { + case Outcome.Errored(e: RuntimeException) => e.getMessage mustEqual "fa failed" + } + } } "timeoutTo" should { @@ -51,6 +85,44 @@ class GenTemporalSpec extends Specification { outer => val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException) F.timeoutTo(fa, Duration.Inf, fallback) mustEqual fa } + + "succeed on a fast action" in { + val fa: TimeT[F, Boolean] = F.pure(true) + val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException) + val op = F.timeoutTo(fa, Duration.Zero, fallback) + + run(TimeT.run(op)) mustEqual Outcome.Succeeded(Some(true)) + } + + "error out on a slow action" in { + val fa: TimeT[F, Boolean] = F.never *> F.pure(true) + val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException) + val op = F.timeoutTo(fa, Duration.Zero, fallback) + + run(TimeT.run(op)) must beLike { + case Outcome.Errored(e) => e must haveClass[RuntimeException] + } + } + + "propagate successful outcome of uncancelable action" in { + val fa = F.uncancelable(_ => F.sleep(50.millis) *> F.pure(true)) + val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException) + val op = F.timeoutTo(fa, Duration.Zero, fallback) + + run(TimeT.run(op)) mustEqual Outcome.Succeeded(Some(true)) + } + + "propagate errors from uncancelable action" in { + val fa = F.uncancelable { _ => + F.sleep(50.millis) *> F.raiseError(new RuntimeException("fa failed")) *> F.pure(true) + } + val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException) + val op = F.timeoutTo(fa, Duration.Zero, fallback) + + run(TimeT.run(op)) must beLike { + case Outcome.Errored(e: RuntimeException) => e.getMessage mustEqual "fa failed" + } + } } "timeoutAndForget" should { @@ -63,13 +135,6 @@ class GenTemporalSpec extends Specification { outer => // TODO enable these tests once Temporal for TimeT is fixed /*"temporal" should { - "timeout" should { - "succeed" in { - val op = F.timeout(F.pure(true), 10.seconds) - - run(TimeT.run(op)) mustEqual Succeeded(Some(true)) - }.pendingUntilFixed - "cancel a loop" in { val op: TimeT[F, Either[Throwable, Unit]] = F.timeout(loop, 5.millis).attempt @@ -80,12 +145,6 @@ class GenTemporalSpec extends Specification { outer => } "timeoutTo" should { - "succeed" in { - val op: TimeT[F, Boolean] = F.timeoutTo(F.pure(true), 5.millis, F.raiseError(new RuntimeException)) - - run(TimeT.run(op)) mustEqual Succeeded(Some(true)) - }.pendingUntilFixed - "use fallback" in { val op: TimeT[F, Boolean] = F.timeoutTo(loop >> F.pure(false), 5.millis, F.pure(true)) diff --git a/tests/shared/src/test/scala/cats/effect/IOSpec.scala b/tests/shared/src/test/scala/cats/effect/IOSpec.scala index a4e2359f6b..381742308a 100644 --- a/tests/shared/src/test/scala/cats/effect/IOSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/IOSpec.scala @@ -1853,6 +1853,23 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { "non-terminate on an uncancelable fiber" in ticked { implicit ticker => IO.never.uncancelable.timeout(1.second) must nonTerminate } + + "propagate successful result from a completed effect" in real { + IO.pure(true).delayBy(50.millis).uncancelable.timeout(10.millis).map { res => + res must beTrue + } + } + + "propagate error from a completed effect" in real { + IO.raiseError(new RuntimeException) + .delayBy(50.millis) + .uncancelable + .timeout(10.millis) + .attempt + .map { res => + res must beLike { case Left(e) => e must haveClass[RuntimeException] } + } + } } "timeoutTo" should {