Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make timeout/timeoutTo always return the outcome of the effect #4059

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 38 additions & 19 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -779,17 +779,27 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
andWait(duration: Duration)

/**
* Returns an IO that either completes with the result of the source within the specified time
* `duration` or otherwise raises a `TimeoutException`.
* Returns an IO that either completes with the result of the source or otherwise raises a
* `TimeoutException`.
*
* The source is canceled in the event that it takes longer than the specified time duration
* to complete. Once the source has been successfully canceled (and has completed its
* finalizers), the `TimeoutException` will be raised. If the source is uncancelable, the
* resulting effect will wait for it to complete before raising the exception.
* The source is raced against the timeout `duration`, and its cancelation is triggered if the
* source doesn't complete within the specified time. The resulting effect will always wait
* for the source effect to complete (and to complete its finalizers), and will return the
* source's outcome over raising a `TimeoutException`.
*
* In case source and timeout complete simultaneously, the result of the source will be
* returned over raising a `TimeoutException`.
*
* If the source effect is uncancelable, a `TimeoutException` will never be raised.
*
* @param duration
* is the time span for which we wait for the source to complete; in the event that the
* specified time has passed without the source completing, a `TimeoutException` is raised
* is the time span for which we wait for the source to complete before triggering its
* cancelation; in the event that the specified time has passed without the source
* completing, a `TimeoutException` is raised
*
* @see
* [[timeoutAndForget]] for a variant which does not wait for cancelation of the source
* effect to complete.
*/
def timeout[A2 >: A](duration: Duration): IO[A2] =
handleDuration(duration, this) { finiteDuration =>
Expand All @@ -802,26 +812,35 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
timeout(duration: Duration)

/**
* Returns an IO that either completes with the result of the source within the specified time
* `duration` or otherwise evaluates the `fallback`.
* Returns an IO that either completes with the result of the source or otherwise evaluates
* the `fallback`.
*
* The source is canceled in the event that it takes longer than the specified time duration
* to complete. Once the source has been successfully canceled (and has completed its
* finalizers), the fallback will be sequenced. If the source is uncancelable, the resulting
* effect will wait for it to complete before evaluating the fallback.
* The source is raised against the timeout `duration`, and its cancelation is triggered if
* the source doesn't complete within the specified time. The resulting effect will always
* wait for the source effect to complete (and to complete its finalizers), and will return
* the source's outcome over sequencing the `fallback`.
*
* In case source and timeout complete simultaneously, the result of the source will be
* returned over sequencing the `fallback`.
*
* If the source in uncancelable, `fallback` will never be evaluated.
*
* @param duration
* is the time span for which we wait for the source to complete; in the event that the
* specified time has passed without the source completing, the `fallback` gets evaluated
* is the time span for which we wait for the source to complete before triggering its
* cancelation; in the event that the specified time has passed without the source
* completing, the `fallback` gets evaluated
*
* @param fallback
* is the task evaluated after the duration has passed and the source canceled
*/
def timeoutTo[A2 >: A](duration: Duration, fallback: IO[A2]): IO[A2] = {
handleDuration[IO[A2]](duration, this) { finiteDuration =>
race(IO.sleep(finiteDuration)).flatMap {
case Right(_) => fallback
case Left(value) => IO.pure(value)
IO.uncancelable { poll =>
poll(racePair(IO.sleep(finiteDuration))) flatMap {
case Left((oc, f)) => f.cancel *> oc.embed(poll(IO.canceled) *> IO.never)
case Right((f, _)) =>
f.cancel *> f.join.flatMap { oc => oc.fold(fallback, IO.raiseError, identity) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the oc.fold here be oc.embed instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oc.embed does not apply here because the return type of IO.timeoutTo is IO[A2], and not IO[A], and is thus incompatible with Outcome.embed.

Note that the signature of timeoutTo in IO differs from that in GenTemporal. The latter returns F[A], keeping the same effect type.

}
}
}
}
Expand Down
70 changes: 48 additions & 22 deletions kernel/shared/src/main/scala/cats/effect/kernel/GenTemporal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,23 @@ trait GenTemporal[F[_], E] extends GenConcurrent[F, E] with Clock[F] {
productL(fa)(sleep(time))

/**
* Returns an effect that either completes with the result of the source within the specified
* time `duration` or otherwise evaluates the `fallback`.
* Returns an effect that either completes with the result of the source or otherwise
* evaluates the `fallback`.
*
* The source is canceled in the event that it takes longer than the specified time duration
* to complete. Once the source has been successfully canceled (and has completed its
* finalizers), the fallback will be sequenced. If the source is uncancelable, the resulting
* effect will wait for it to complete before evaluating the fallback.
* The source is raised against the timeout `duration`, and its cancelation is triggered if
* the source doesn't complete within the specified time. The resulting effect will always
* wait for the source effect to complete (and to complete its finalizers), and will return
* the source's outcome over sequencing the `fallback`.
*
* In case source and timeout complete simultaneously, the result of the source will be
* returned over sequencing the `fallback`.
*
* If the source in uncancelable, `fallback` will never be evaluated.
*
* @param duration
* The time span for which we wait for the source to complete; in the event that the
* specified time has passed without the source completing, the `fallback` gets evaluated
* The time span for which we wait for the source to complete before triggering its
* cancelation; in the event that the specified time has passed without the source
* completing, the `fallback` gets evaluated
*
* @param fallback
* The task evaluated after the duration has passed and the source canceled
Expand All @@ -91,33 +97,53 @@ trait GenTemporal[F[_], E] extends GenConcurrent[F, E] with Clock[F] {
handleDuration(duration, fa)(timeoutTo(fa, _, fallback))

protected def timeoutTo[A](fa: F[A], duration: FiniteDuration, fallback: F[A]): F[A] =
flatMap(race(fa, sleep(duration))) {
case Left(a) => pure(a)
case Right(_) => fallback
uncancelable { poll =>
implicit val F: GenTemporal[F, E] = this

poll(racePair(fa, sleep(duration))) flatMap {
case Left((oc, f)) => f.cancel *> oc.embed(poll(F.canceled) *> F.never)
case Right((f, _)) => f.cancel *> f.join.flatMap { oc => oc.embed(fallback) }
}
}

/**
* Returns an effect that either completes with the result of the source within the specified
* time `duration` or otherwise raises a `TimeoutException`.
* Returns an effect that either completes with the result of the source or raises a
* `TimeoutException`.
*
* The source is canceled in the event that it takes longer than the specified time duration
* to complete. Once the source has been successfully canceled (and has completed its
* finalizers), the `TimeoutException` will be raised. If the source is uncancelable, the
* resulting effect will wait for it to complete before raising the exception.
* The source is raced against the timeout `duration`, and its cancelation is triggered if the
* source doesn't complete within the specified time. The resulting effect will always wait
* for the source effect to complete (and to complete its finalizers), and will return the
* source's outcome over raising a `TimeoutException`.
*
* In case source and timeout complete simultaneously, the result of the source will be
* returned over raising a `TimeoutException`.
*
* If the source effect is uncancelable, a `TimeoutException` will never be raised.
*
* @param duration
* The time span for which we wait for the source to complete; in the event that the
* specified time has passed without the source completing, a `TimeoutException` is raised
* The time span for which we wait for the source to complete before triggering its
* cancelation; in the event that the specified time has passed without the source
* completing, a `TimeoutException` is raised
* @see
* [[timeoutAndForget[A](fa:F[A],duration:scala\.concurrent\.duration\.Duration)* timeoutAndForget]]
* for a variant which does not wait for cancelation of the source effect to complete.
*/
def timeout[A](fa: F[A], duration: Duration)(implicit ev: TimeoutException <:< E): F[A] = {
handleDuration(duration, fa)(timeout(fa, _))
}

protected def timeout[A](fa: F[A], duration: FiniteDuration)(
implicit ev: TimeoutException <:< E): F[A] = {
flatMap(race(fa, sleep(duration))) {
case Left(a) => pure(a)
case Right(_) => raiseError[A](ev(new TimeoutException(duration.toString())))
uncancelable { poll =>
implicit val F: GenTemporal[F, E] = this

poll(racePair(fa, sleep(duration))) flatMap {
case Left((oc, f)) => f.cancel *> oc.embed(poll(F.canceled) *> F.never)
case Right((f, _)) =>
f.cancel *> f.join.flatMap { oc =>
oc.embed(raiseError[A](ev(new TimeoutException(duration.toString()))))
}
}
}
}

Expand Down
89 changes: 74 additions & 15 deletions laws/shared/src/test/scala/cats/effect/laws/GenTemporalSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ package cats
package effect
package laws

import cats.effect.kernel.Temporal
import cats.effect.kernel.{Outcome, Temporal}
import cats.effect.kernel.testkit.TimeT
import cats.effect.kernel.testkit.pure._
import cats.syntax.all._

import org.specs2.mutable.Specification

import scala.concurrent.TimeoutException
import scala.concurrent.duration._
// import scala.concurrent.TimeoutException

class GenTemporalSpec extends Specification { outer =>

Expand All @@ -43,6 +43,40 @@ class GenTemporalSpec extends Specification { outer =>
val fa = F.pure(true)
F.timeout(fa, Duration.Inf) mustEqual fa
}

"succeed on a fast action" in {
val fa: TimeT[F, Boolean] = F.pure(true)
val op = F.timeout(fa, Duration.Zero)

run(TimeT.run(op)) mustEqual Outcome.Succeeded(Some(true))
}

"error out on a slow action" in {
val fa: TimeT[F, Boolean] = F.never *> F.pure(true)
val op = F.timeout(fa, Duration.Zero)

run(TimeT.run(op)) must beLike {
case Outcome.Errored(e) => e must haveClass[TimeoutException]
}
}

"propagate successful outcome of uncancelable action" in {
val fa = F.uncancelable(_ => F.sleep(50.millis) *> F.pure(true))
val op = F.timeout(fa, Duration.Zero)

run(TimeT.run(op)) mustEqual Outcome.Succeeded(Some(true))
}

"propagate errors from uncancelable action" in {
val fa = F.uncancelable { _ =>
F.sleep(50.millis) *> F.raiseError(new RuntimeException("fa failed")) *> F.pure(true)
}
val op = F.timeout(fa, Duration.Zero)

run(TimeT.run(op)) must beLike {
case Outcome.Errored(e: RuntimeException) => e.getMessage mustEqual "fa failed"
}
}
}

"timeoutTo" should {
Expand All @@ -51,6 +85,44 @@ class GenTemporalSpec extends Specification { outer =>
val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException)
F.timeoutTo(fa, Duration.Inf, fallback) mustEqual fa
}

"succeed on a fast action" in {
val fa: TimeT[F, Boolean] = F.pure(true)
val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException)
val op = F.timeoutTo(fa, Duration.Zero, fallback)

run(TimeT.run(op)) mustEqual Outcome.Succeeded(Some(true))
}

"error out on a slow action" in {
val fa: TimeT[F, Boolean] = F.never *> F.pure(true)
val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException)
val op = F.timeoutTo(fa, Duration.Zero, fallback)

run(TimeT.run(op)) must beLike {
case Outcome.Errored(e) => e must haveClass[RuntimeException]
}
}

"propagate successful outcome of uncancelable action" in {
val fa = F.uncancelable(_ => F.sleep(50.millis) *> F.pure(true))
val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException)
val op = F.timeoutTo(fa, Duration.Zero, fallback)

run(TimeT.run(op)) mustEqual Outcome.Succeeded(Some(true))
}

"propagate errors from uncancelable action" in {
val fa = F.uncancelable { _ =>
F.sleep(50.millis) *> F.raiseError(new RuntimeException("fa failed")) *> F.pure(true)
}
val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException)
val op = F.timeoutTo(fa, Duration.Zero, fallback)

run(TimeT.run(op)) must beLike {
case Outcome.Errored(e: RuntimeException) => e.getMessage mustEqual "fa failed"
}
}
}

"timeoutAndForget" should {
Expand All @@ -63,13 +135,6 @@ class GenTemporalSpec extends Specification { outer =>

// TODO enable these tests once Temporal for TimeT is fixed
/*"temporal" should {
"timeout" should {
"succeed" in {
val op = F.timeout(F.pure(true), 10.seconds)

run(TimeT.run(op)) mustEqual Succeeded(Some(true))
}.pendingUntilFixed

"cancel a loop" in {
val op: TimeT[F, Either[Throwable, Unit]] = F.timeout(loop, 5.millis).attempt

Expand All @@ -80,12 +145,6 @@ class GenTemporalSpec extends Specification { outer =>
}

"timeoutTo" should {
"succeed" in {
val op: TimeT[F, Boolean] = F.timeoutTo(F.pure(true), 5.millis, F.raiseError(new RuntimeException))

run(TimeT.run(op)) mustEqual Succeeded(Some(true))
}.pendingUntilFixed

"use fallback" in {
val op: TimeT[F, Boolean] = F.timeoutTo(loop >> F.pure(false), 5.millis, F.pure(true))

Expand Down
17 changes: 17 additions & 0 deletions tests/shared/src/test/scala/cats/effect/IOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,23 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {
"non-terminate on an uncancelable fiber" in ticked { implicit ticker =>
IO.never.uncancelable.timeout(1.second) must nonTerminate
}

"propagate successful result from a completed effect" in real {
IO.pure(true).delayBy(50.millis).uncancelable.timeout(10.millis).map { res =>
res must beTrue
}
}

"propagate error from a completed effect" in real {
IO.raiseError(new RuntimeException)
.delayBy(50.millis)
.uncancelable
.timeout(10.millis)
.attempt
.map { res =>
res must beLike { case Left(e) => e must haveClass[RuntimeException] }
}
}
}

"timeoutTo" should {
Expand Down
Loading