Skip to content

Commit

Permalink
Make timeout/timeoutTo always return the outcome of the effect
Browse files Browse the repository at this point in the history
`timeout*` methods are implemented in terms of a race between a desired
effect and the timeout. In the case that both effects complete
simultaneously, it could happen that the timeout would win the race, a
`TimeoutException` be raised, and the outcome of the desired effect
lost.

As is noted in #3456, this is a general problem with the `race*`
methods, and can't be addressed in the general case without breaking the
current interfaces.

This change is a more narrow take on the problem specifically focusing
on the `timeout` and `timeoutTo` methods. As these methods inherently
wait for both racing effects to complete, the implementation is changed
to always take into account the outcome of the desired effect, only
raising a `TimeoutException` if the timeout won the race *and* the
desired effect was effectively canceled. Similarly, errors from the
desired effect are preferentially propagated over the generic
`TimeoutException`.

The `timeoutAndForget` methods are left unchanged, as they explicitly
avoid waiting for the losing effect to finish.

This change allows for `timeout` and `timeoutTo` methods to be safely
used on effects that acquire resources, such as `Semaphore.acquire`,
ensuring that successful outcomes are always propagated back to the
user.
  • Loading branch information
biochimia committed Apr 29, 2024
1 parent 769a89e commit 718c78e
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 56 deletions.
57 changes: 38 additions & 19 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -779,17 +779,27 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
andWait(duration: Duration)

/**
* Returns an IO that either completes with the result of the source within the specified time
* `duration` or otherwise raises a `TimeoutException`.
* Returns an IO that either completes with the result of the source or otherwise raises a
* `TimeoutException`.
*
* The source is canceled in the event that it takes longer than the specified time duration
* to complete. Once the source has been successfully canceled (and has completed its
* finalizers), the `TimeoutException` will be raised. If the source is uncancelable, the
* resulting effect will wait for it to complete before raising the exception.
* The source is raced against the timeout `duration`, and its cancelation is triggered if the
* source doesn't complete within the specified time. The resulting effect will always wait
* for the source effect to complete (and to complete its finalizers), and will return the
* source's outcome over raising a `TimeoutException`.
*
* In case source and timeout complete simultaneously, the result of the source will be
* returned over raising a `TimeoutException`.
*
* If the source effect is uncancelable, a `TimeoutException` will never be raised.
*
* @param duration
* is the time span for which we wait for the source to complete; in the event that the
* specified time has passed without the source completing, a `TimeoutException` is raised
* is the time span for which we wait for the source to complete before triggering its
* cancelation; in the event that the specified time has passed without the source
* completing, a `TimeoutException` is raised
*
* @see
* [[timeoutAndForget]] for a variant which does not wait for cancelation of the source
* effect to complete.
*/
def timeout[A2 >: A](duration: Duration): IO[A2] =
handleDuration(duration, this) { finiteDuration =>
Expand All @@ -802,26 +812,35 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
timeout(duration: Duration)

/**
* Returns an IO that either completes with the result of the source within the specified time
* `duration` or otherwise evaluates the `fallback`.
* Returns an IO that either completes with the result of the source or otherwise evaluates
* the `fallback`.
*
* The source is canceled in the event that it takes longer than the specified time duration
* to complete. Once the source has been successfully canceled (and has completed its
* finalizers), the fallback will be sequenced. If the source is uncancelable, the resulting
* effect will wait for it to complete before evaluating the fallback.
* The source is raised against the timeout `duration`, and its cancelation is triggered if
* the source doesn't complete within the specified time. The resulting effect will always
* wait for the source effect to complete (and to complete its finalizers), and will return
* the source's outcome over sequencing the `fallback`.
*
* In case source and timeout complete simultaneously, the result of the source will be
* returned over sequencing the `fallback`.
*
* If the source in uncancelable, `fallback` will never be evaluated.
*
* @param duration
* is the time span for which we wait for the source to complete; in the event that the
* specified time has passed without the source completing, the `fallback` gets evaluated
* is the time span for which we wait for the source to complete before triggering its
* cancelation; in the event that the specified time has passed without the source
* completing, the `fallback` gets evaluated
*
* @param fallback
* is the task evaluated after the duration has passed and the source canceled
*/
def timeoutTo[A2 >: A](duration: Duration, fallback: IO[A2]): IO[A2] = {
handleDuration[IO[A2]](duration, this) { finiteDuration =>
race(IO.sleep(finiteDuration)).flatMap {
case Right(_) => fallback
case Left(value) => IO.pure(value)
IO.uncancelable { poll =>
poll(racePair(IO.sleep(finiteDuration))) flatMap {
case Left((oc, f)) => f.cancel *> oc.embed(poll(IO.canceled) *> IO.never)
case Right((f, _)) =>
f.cancel *> f.join.flatMap { oc => oc.fold(fallback, IO.raiseError, identity) }
}
}
}
}
Expand Down
70 changes: 48 additions & 22 deletions kernel/shared/src/main/scala/cats/effect/kernel/GenTemporal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,23 @@ trait GenTemporal[F[_], E] extends GenConcurrent[F, E] with Clock[F] {
productL(fa)(sleep(time))

/**
* Returns an effect that either completes with the result of the source within the specified
* time `duration` or otherwise evaluates the `fallback`.
* Returns an effect that either completes with the result of the source or otherwise
* evaluates the `fallback`.
*
* The source is canceled in the event that it takes longer than the specified time duration
* to complete. Once the source has been successfully canceled (and has completed its
* finalizers), the fallback will be sequenced. If the source is uncancelable, the resulting
* effect will wait for it to complete before evaluating the fallback.
* The source is raised against the timeout `duration`, and its cancelation is triggered if
* the source doesn't complete within the specified time. The resulting effect will always
* wait for the source effect to complete (and to complete its finalizers), and will return
* the source's outcome over sequencing the `fallback`.
*
* In case source and timeout complete simultaneously, the result of the source will be
* returned over sequencing the `fallback`.
*
* If the source in uncancelable, `fallback` will never be evaluated.
*
* @param duration
* The time span for which we wait for the source to complete; in the event that the
* specified time has passed without the source completing, the `fallback` gets evaluated
* The time span for which we wait for the source to complete before triggering its
* cancelation; in the event that the specified time has passed without the source
* completing, the `fallback` gets evaluated
*
* @param fallback
* The task evaluated after the duration has passed and the source canceled
Expand All @@ -91,33 +97,53 @@ trait GenTemporal[F[_], E] extends GenConcurrent[F, E] with Clock[F] {
handleDuration(duration, fa)(timeoutTo(fa, _, fallback))

protected def timeoutTo[A](fa: F[A], duration: FiniteDuration, fallback: F[A]): F[A] =
flatMap(race(fa, sleep(duration))) {
case Left(a) => pure(a)
case Right(_) => fallback
uncancelable { poll =>
implicit val F: GenTemporal[F, E] = this

poll(racePair(fa, sleep(duration))) flatMap {
case Left((oc, f)) => f.cancel *> oc.embed(poll(F.canceled) *> F.never)
case Right((f, _)) => f.cancel *> f.join.flatMap { oc => oc.embed(fallback) }
}
}

/**
* Returns an effect that either completes with the result of the source within the specified
* time `duration` or otherwise raises a `TimeoutException`.
* Returns an effect that either completes with the result of the source or raises a
* `TimeoutException`.
*
* The source is canceled in the event that it takes longer than the specified time duration
* to complete. Once the source has been successfully canceled (and has completed its
* finalizers), the `TimeoutException` will be raised. If the source is uncancelable, the
* resulting effect will wait for it to complete before raising the exception.
* The source is raced against the timeout `duration`, and its cancelation is triggered if the
* source doesn't complete within the specified time. The resulting effect will always wait
* for the source effect to complete (and to complete its finalizers), and will return the
* source's outcome over raising a `TimeoutException`.
*
* In case source and timeout complete simultaneously, the result of the source will be
* returned over raising a `TimeoutException`.
*
* If the source effect is uncancelable, a `TimeoutException` will never be raised.
*
* @param duration
* The time span for which we wait for the source to complete; in the event that the
* specified time has passed without the source completing, a `TimeoutException` is raised
* The time span for which we wait for the source to complete before triggering its
* cancelation; in the event that the specified time has passed without the source
* completing, a `TimeoutException` is raised
* @see
* [[timeoutAndForget[A](fa:F[A],duration:scala\.concurrent\.duration\.Duration)* timeoutAndForget]]
* for a variant which does not wait for cancelation of the source effect to complete.
*/
def timeout[A](fa: F[A], duration: Duration)(implicit ev: TimeoutException <:< E): F[A] = {
handleDuration(duration, fa)(timeout(fa, _))
}

protected def timeout[A](fa: F[A], duration: FiniteDuration)(
implicit ev: TimeoutException <:< E): F[A] = {
flatMap(race(fa, sleep(duration))) {
case Left(a) => pure(a)
case Right(_) => raiseError[A](ev(new TimeoutException(duration.toString())))
uncancelable { poll =>
implicit val F: GenTemporal[F, E] = this

poll(racePair(fa, sleep(duration))) flatMap {
case Left((oc, f)) => f.cancel *> oc.embed(poll(F.canceled) *> F.never)
case Right((f, _)) =>
f.cancel *> f.join.flatMap { oc =>
oc.embed(raiseError[A](ev(new TimeoutException(duration.toString()))))
}
}
}
}

Expand Down
89 changes: 74 additions & 15 deletions laws/shared/src/test/scala/cats/effect/laws/GenTemporalSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ package cats
package effect
package laws

import cats.effect.kernel.Temporal
import cats.effect.kernel.{Outcome, Temporal}
import cats.effect.kernel.testkit.TimeT
import cats.effect.kernel.testkit.pure._
import cats.syntax.all._

import org.specs2.mutable.Specification

import scala.concurrent.TimeoutException
import scala.concurrent.duration._
// import scala.concurrent.TimeoutException

class GenTemporalSpec extends Specification { outer =>

Expand All @@ -43,6 +43,40 @@ class GenTemporalSpec extends Specification { outer =>
val fa = F.pure(true)
F.timeout(fa, Duration.Inf) mustEqual fa
}

"succeed on a fast action" in {
val fa: TimeT[F, Boolean] = F.pure(true)
val op = F.timeout(fa, Duration.Zero)

run(TimeT.run(op)) mustEqual Outcome.Succeeded(Some(true))
}

"error out on a slow action" in {
val fa: TimeT[F, Boolean] = F.never *> F.pure(true)
val op = F.timeout(fa, Duration.Zero)

run(TimeT.run(op)) must beLike {
case Outcome.Errored(e) => e must haveClass[TimeoutException]
}
}

"propagate successful outcome of uncancelable action" in {
val fa = F.uncancelable(_ => F.sleep(50.millis) *> F.pure(true))
val op = F.timeout(fa, Duration.Zero)

run(TimeT.run(op)) mustEqual Outcome.Succeeded(Some(true))
}

"propagate errors from uncancelable action" in {
val fa = F.uncancelable { _ =>
F.sleep(50.millis) *> F.raiseError(new RuntimeException("fa failed")) *> F.pure(true)
}
val op = F.timeout(fa, Duration.Zero)

run(TimeT.run(op)) must beLike {
case Outcome.Errored(e: RuntimeException) => e.getMessage mustEqual "fa failed"
}
}
}

"timeoutTo" should {
Expand All @@ -51,6 +85,44 @@ class GenTemporalSpec extends Specification { outer =>
val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException)
F.timeoutTo(fa, Duration.Inf, fallback) mustEqual fa
}

"succeed on a fast action" in {
val fa: TimeT[F, Boolean] = F.pure(true)
val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException)
val op = F.timeoutTo(fa, Duration.Zero, fallback)

run(TimeT.run(op)) mustEqual Outcome.Succeeded(Some(true))
}

"error out on a slow action" in {
val fa: TimeT[F, Boolean] = F.never *> F.pure(true)
val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException)
val op = F.timeoutTo(fa, Duration.Zero, fallback)

run(TimeT.run(op)) must beLike {
case Outcome.Errored(e) => e must haveClass[RuntimeException]
}
}

"propagate successful outcome of uncancelable action" in {
val fa = F.uncancelable(_ => F.sleep(50.millis) *> F.pure(true))
val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException)
val op = F.timeoutTo(fa, Duration.Zero, fallback)

run(TimeT.run(op)) mustEqual Outcome.Succeeded(Some(true))
}

"propagate errors from uncancelable action" in {
val fa = F.uncancelable { _ =>
F.sleep(50.millis) *> F.raiseError(new RuntimeException("fa failed")) *> F.pure(true)
}
val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException)
val op = F.timeoutTo(fa, Duration.Zero, fallback)

run(TimeT.run(op)) must beLike {
case Outcome.Errored(e: RuntimeException) => e.getMessage mustEqual "fa failed"
}
}
}

"timeoutAndForget" should {
Expand All @@ -63,13 +135,6 @@ class GenTemporalSpec extends Specification { outer =>

// TODO enable these tests once Temporal for TimeT is fixed
/*"temporal" should {
"timeout" should {
"succeed" in {
val op = F.timeout(F.pure(true), 10.seconds)
run(TimeT.run(op)) mustEqual Succeeded(Some(true))
}.pendingUntilFixed
"cancel a loop" in {
val op: TimeT[F, Either[Throwable, Unit]] = F.timeout(loop, 5.millis).attempt
Expand All @@ -80,12 +145,6 @@ class GenTemporalSpec extends Specification { outer =>
}
"timeoutTo" should {
"succeed" in {
val op: TimeT[F, Boolean] = F.timeoutTo(F.pure(true), 5.millis, F.raiseError(new RuntimeException))
run(TimeT.run(op)) mustEqual Succeeded(Some(true))
}.pendingUntilFixed
"use fallback" in {
val op: TimeT[F, Boolean] = F.timeoutTo(loop >> F.pure(false), 5.millis, F.pure(true))
Expand Down
17 changes: 17 additions & 0 deletions tests/shared/src/test/scala/cats/effect/IOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,23 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {
"non-terminate on an uncancelable fiber" in ticked { implicit ticker =>
IO.never.uncancelable.timeout(1.second) must nonTerminate
}

"propagate successful result from a completed effect" in real {
IO.pure(true).delayBy(50.millis).uncancelable.timeout(10.millis).map { res =>
res must beTrue
}
}

"propagate error from a completed effect" in real {
IO.raiseError(new RuntimeException)
.delayBy(50.millis)
.uncancelable
.timeout(10.millis)
.attempt
.map { res =>
res must beLike { case Left(e) => e must haveClass[RuntimeException] }
}
}
}

"timeoutTo" should {
Expand Down

0 comments on commit 718c78e

Please sign in to comment.