Skip to content

Commit

Permalink
Make timeout/timeoutTo always return the outcome of the effect
Browse files Browse the repository at this point in the history
`timeout*` methods are implemented in terms of a race between a desired
effect and the timeout. In the case that both effects complete
simultaneously, it could happen that the timeout would win the race, a
`TimeoutException` be raised, and the outcome of the desired effect
lost.

As is noted in #3456, this is a general problem with the `race*`
methods, and can't be addressed in the general case without breaking the
current interfaces.

This change is a more narrow take on the problem specifically focusing
on the `timeout` and `timeoutTo` methods. As these methods inherently
wait for both racing effects to complete, the implementation is changed
to always take into account the outcome of the desired effect, only
raising a `TimeoutException` if the timeout won the race *and* the
desired effect was effectively canceled. Similarly, errors from the
desired effect are preferentially propagated over the generic
`TimeoutException`.

The `timeoutAndForget` methods are left unchanged, as they explicitly
avoid waiting for the losing effect to finish.

This change allows for `timeout` and `timeoutTo` methods to be safely
used on effects that acquire resources, such as `Semaphore.acquire`,
ensuring that successful outcomes are always propagated back to the
user.
  • Loading branch information
biochimia committed Apr 21, 2024
1 parent 1ba83f0 commit ff6a283
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 9 deletions.
9 changes: 6 additions & 3 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -819,9 +819,12 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
*/
def timeoutTo[A2 >: A](duration: Duration, fallback: IO[A2]): IO[A2] = {
handleDuration[IO[A2]](duration, this) { finiteDuration =>
race(IO.sleep(finiteDuration)).flatMap {
case Right(_) => fallback
case Left(value) => IO.pure(value)
IO.uncancelable { poll =>
poll(racePair(IO.sleep(finiteDuration))) flatMap {
case Left((oc, f)) => f.cancel *> oc.embed(poll(IO.canceled) *> IO.never)
case Right((f, _)) =>
f.cancel *> f.join.flatMap { oc => oc.fold(fallback, IO.raiseError, identity) }
}
}
}
}
Expand Down
23 changes: 17 additions & 6 deletions kernel/shared/src/main/scala/cats/effect/kernel/GenTemporal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,13 @@ trait GenTemporal[F[_], E] extends GenConcurrent[F, E] with Clock[F] {
handleDuration(duration, fa)(timeoutTo(fa, _, fallback))

protected def timeoutTo[A](fa: F[A], duration: FiniteDuration, fallback: F[A]): F[A] =
flatMap(race(fa, sleep(duration))) {
case Left(a) => pure(a)
case Right(_) => fallback
uncancelable { poll =>
implicit val F: GenTemporal[F, E] = this

poll(racePair(fa, sleep(duration))) flatMap {
case Left((oc, f)) => f.cancel *> oc.embed(poll(F.canceled) *> F.never)
case Right((f, _)) => f.cancel *> f.join.flatMap { oc => oc.embed(fallback) }
}
}

/**
Expand All @@ -115,9 +119,16 @@ trait GenTemporal[F[_], E] extends GenConcurrent[F, E] with Clock[F] {

protected def timeout[A](fa: F[A], duration: FiniteDuration)(
implicit ev: TimeoutException <:< E): F[A] = {
flatMap(race(fa, sleep(duration))) {
case Left(a) => pure(a)
case Right(_) => raiseError[A](ev(new TimeoutException(duration.toString())))
uncancelable { poll =>
implicit val F: GenTemporal[F, E] = this

poll(racePair(fa, sleep(duration))) flatMap {
case Left((oc, f)) => f.cancel *> oc.embed(poll(F.canceled) *> F.never)
case Right((f, _)) =>
f.cancel *> f.join.flatMap { oc =>
oc.embed(raiseError[A](ev(new TimeoutException(duration.toString()))))
}
}
}
}

Expand Down
25 changes: 25 additions & 0 deletions tests/shared/src/test/scala/cats/effect/IOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,31 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {
"non-terminate on an uncancelable fiber" in ticked { implicit ticker =>
IO.never.uncancelable.timeout(1.second) must nonTerminate
}

"propagate successful result from a completed effect" in real {
IO
.pure(true)
.delayBy(50.millis)
.uncancelable
.timeout(10.millis)
.map { res =>
res must beTrue
}
}

"propagate error from a completed effect" in real {
IO
.raiseError(new RuntimeException)
.delayBy(50.millis)
.uncancelable
.timeout(10.millis)
.attempt
.map { res =>
res must beLike {
case Left(e) => e must haveClass[RuntimeException]
}
}
}
}

"timeoutTo" should {
Expand Down

0 comments on commit ff6a283

Please sign in to comment.