Skip to content

Commit

Permalink
Merge pull request #3091 from seigert/add-async-poll
Browse files Browse the repository at this point in the history
Add `Async#asyncCheckAttempt` for #3087
  • Loading branch information
djspiewak authored Nov 14, 2022
2 parents 832079a + 719c106 commit fa3324a
Show file tree
Hide file tree
Showing 6 changed files with 506 additions and 20 deletions.
91 changes: 88 additions & 3 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1067,10 +1067,81 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
def defer[A](thunk: => IO[A]): IO[A] =
delay(thunk).flatten

/**
* Suspends an asynchronous side effect with optional immediate result in `IO`.
*
* The given function `k` will be invoked during evaluation of the `IO` to:
* - check if result is already available;
* - "schedule" the asynchronous callback, where the callback of type `Either[Throwable, A]
* \=> Unit` is the parameter passed to that function. Only the ''first'' invocation of
* the callback will be effective! All subsequent invocations will be silently dropped.
*
* The process of registering the callback itself is suspended in `IO` (the outer `IO` of
* `IO[Either[Option[IO[Unit]], A]]`).
*
* The effect returns `Either[Option[IO[Unit]], A]` where:
* - right side `A` is an immediate result of computation (callback invocation will be
* dropped);
* - left side `Option[IO[Unit]] `is an optional finalizer to be run in the event that the
* fiber running `asyncCheckAttempt(k)` is canceled.
*
* For example, here is a simplified version of `IO.fromCompletableFuture`:
*
* {{{
* def fromCompletableFuture[A](fut: IO[CompletableFuture[A]]): IO[A] = {
* fut.flatMap { cf =>
* IO.asyncCheckAttempt { cb =>
* if (cf.isDone) {
* //Register immediately available result of the completable future or handle an error
* IO(cf.get)
* .map(Right(_))
* .handleError { e =>
* cb(Left(e))
* Left(None)
* }
* } else {
* IO {
* //Invoke the callback with the result of the completable future
* val stage = cf.handle[Unit] {
* case (a, null) => cb(Right(a))
* case (_, e) => cb(Left(e))
* }
*
* //Cancel the completable future if the fiber is canceled
* Left(Some(IO(stage.cancel(false)).void))
* }
* }
* }
* }
* }
* }}}
*
* Note that `asyncCheckAttempt` is uncancelable during its registration.
*
* @see
* [[async]] for a simplified variant without an option for immediate result
*/
def asyncCheckAttempt[A](
k: (Either[Throwable, A] => Unit) => IO[Either[Option[IO[Unit]], A]]): IO[A] = {
val body = new Cont[IO, A, A] {
def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) =>
G.uncancelable { poll =>
lift(k(resume)) flatMap {
case Right(a) => G.pure(a)
case Left(Some(fin)) => G.onCancel(poll(get), lift(fin))
case Left(None) => poll(get)
}
}
}
}

IOCont(body, Tracing.calculateTracingEvent(k))
}

/**
* Suspends an asynchronous side effect in `IO`.
*
* The given function will be invoked during evaluation of the `IO` to "schedule" the
* The given function `k` will be invoked during evaluation of the `IO` to "schedule" the
* asynchronous callback, where the callback of type `Either[Throwable, A] => Unit` is the
* parameter passed to that function. Only the ''first'' invocation of the callback will be
* effective! All subsequent invocations will be silently dropped.
Expand Down Expand Up @@ -1102,8 +1173,13 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
* }
* }}}
*
* Note that `async` is uncancelable during its registration.
*
* @see
* [[async_]] for a simplified variant without a finalizer
* @see
* [[asyncCheckAttempt]] for more generic version providing an optional immediate result of
* computation
*/
def async[A](k: (Either[Throwable, A] => Unit) => IO[Option[IO[Unit]]]): IO[A] = {
val body = new Cont[IO, A, A] {
Expand All @@ -1123,7 +1199,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
/**
* Suspends an asynchronous side effect in `IO`.
*
* The given function will be invoked during evaluation of the `IO` to "schedule" the
* The given function `k` will be invoked during evaluation of the `IO` to "schedule" the
* asynchronous callback, where the callback is the parameter passed to that function. Only
* the ''first'' invocation of the callback will be effective! All subsequent invocations will
* be silently dropped.
Expand All @@ -1150,8 +1226,13 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
* This function can be thought of as a safer, lexically-constrained version of `Promise`,
* where `IO` is like a safer, lazy version of `Future`.
*
* Also, note that `async` is uncancelable during its registration.
*
* @see
* [[async]]
* [[async]] for more generic version providing a finalizer
* @see
* [[asyncCheckAttempt]] for more generic version providing an optional immediate result of
* computation and a finalizer
*/
def async_[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = {
val body = new Cont[IO, A, A] {
Expand Down Expand Up @@ -1605,6 +1686,10 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
private[this] val _asyncForIO: kernel.Async[IO] = new kernel.Async[IO]
with StackSafeMonad[IO] {

override def asyncCheckAttempt[A](
k: (Either[Throwable, A] => Unit) => IO[Either[Option[IO[Unit]], A]]): IO[A] =
IO.asyncCheckAttempt(k)

override def async[A](k: (Either[Throwable, A] => Unit) => IO[Option[IO[Unit]]]): IO[A] =
IO.async(k)

Expand Down
87 changes: 70 additions & 17 deletions kernel/shared/src/main/scala/cats/effect/kernel/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,39 @@ import java.util.concurrent.atomic.AtomicReference
trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {

/**
* The asynchronous FFI.
* Suspends an asynchronous side effect with optional immediate result in `F`.
*
* `k` takes a callback of type `Either[Throwable, A] => Unit` to signal the result of the
* asynchronous computation. The execution of `async(k)` is semantically blocked until the
* callback is invoked.
* The given function `k` will be invoked during evaluation of `F` to:
* - check if result is already available;
* - "schedule" the asynchronous callback, where the callback of type `Either[Throwable, A]
* \=> Unit` is the parameter passed to that function. Only the ''first'' invocation of
* the callback will be effective! All subsequent invocations will be silently dropped.
*
* `k` returns an `Option[F[Unit]]` which is an optional finalizer to be run in the event that
* the fiber running `async(k)` is canceled. If passed `k` is `None`, then created effect will
* be uncancelable.
* The process of registering the callback itself is suspended in `F` (the outer `F` of
* `F[Either[Option[G[Unit]], A]]`).
*
* Also, note that `async` is uncancelable during its registration.
* The effect returns `Either[Option[F[Unit]], A]` where:
* - right side `A` is an immediate result of computation (callback invocation will be
* dropped);
* - left side `Option[F[Unit]] `is an optional finalizer to be run in the event that the
* fiber running `asyncCheckAttempt(k)` is canceled.
*
* Also, note that `asyncCheckAttempt` is uncancelable during its registration.
*
* @see
* [[async]] for a simplified variant without an option for immediate result
* @see
* [[async_]] for a simplified variant without an option for immediate result or finalizer
*/
def async[A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]]): F[A] = {
def asyncCheckAttempt[A](
k: (Either[Throwable, A] => Unit) => F[Either[Option[F[Unit]], A]]): F[A] = {
val body = new Cont[F, A, A] {
def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) =>
G.uncancelable { poll =>
lift(k(resume)) flatMap {
case Some(fin) => G.onCancel(poll(get), lift(fin))
case None => poll(get)
case Right(a) => G.pure(a)
case Left(Some(fin)) => G.onCancel(poll(get), lift(fin))
case Left(None) => poll(get)
}
}
}
Expand All @@ -90,9 +104,48 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
}

/**
* A convenience version of [[Async.async]] for when we don't need to perform `F[_]` effects
* or perform finalization in the event of cancelation (i.e. created effect will be
* uncancelable).
* Suspends an asynchronous side effect in `F`.
*
* The given function `k` will be invoked during evaluation of the `F` to "schedule" the
* asynchronous callback, where the callback of type `Either[Throwable, A] => Unit` is the
* parameter passed to that function. Only the ''first'' invocation of the callback will be
* effective! All subsequent invocations will be silently dropped.
*
* The process of registering the callback itself is suspended in `F` (the outer `F` of
* `F[Option[F[Unit]]]`).
*
* The effect returns `Option[F[Unit]]` which is an optional finalizer to be run in the event
* that the fiber running `async(k)` is canceled.
*
* Also, note that `async` is uncancelable during its registration.
*
* @see
* [[async_]] for a simplified variant without a finalizer
* @see
* [[asyncCheckAttempt]] for more generic version with option of providing immediate result
* of computation
*/
def async[A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]]): F[A] =
asyncCheckAttempt[A](cb => map(k(cb))(Left(_)))

/**
* Suspends an asynchronous side effect in `F`.
*
* The given function `k` will be invoked during evaluation of the `F` to "schedule" the
* asynchronous callback, where the callback is the parameter passed to that function. Only
* the ''first'' invocation of the callback will be effective! All subsequent invocations will
* be silently dropped.
*
* This function can be thought of as a safer, lexically-constrained version of `Promise`,
* where `IO` is like a safer, lazy version of `Future`.
*
* Also, note that `async` is uncancelable during its registration.
*
* @see
* [[async]] for more generic version providing a finalizer
* @see
* [[asyncCheckAttempt]] for more generic version with option of providing immediate result
* of computation and finalizer
*/
def async_[A](k: (Either[Throwable, A] => Unit) => Unit): F[A] =
async[A](cb => as(delay(k(cb)), None))
Expand Down Expand Up @@ -183,9 +236,9 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
* NOTE: This is a very low level api, end users should use `async` instead.
* See cats.effect.kernel.Cont for more detail.
*
* If you are an implementor, and you have `async`, `Async.defaultCont`
* provides an implementation of `cont` in terms of `async`.
* Note that if you use `defaultCont` you _have_ to override `async`.
* If you are an implementor, and you have `async` or `asyncCheckAttempt`,
* `Async.defaultCont` provides an implementation of `cont` in terms of `async`.
* Note that if you use `defaultCont` you _have_ to override `async/asyncCheckAttempt`.
*/
def cont[K, R](body: Cont[F, K, R]): F[R]
}
Expand Down
15 changes: 15 additions & 0 deletions laws/shared/src/main/scala/cats/effect/laws/AsyncLaws.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@ import scala.util.{Left, Right}
trait AsyncLaws[F[_]] extends GenTemporalLaws[F, Throwable] with SyncLaws[F] {
implicit val F: Async[F]

// format: off
def asyncCheckAttemptImmediateIsPure[A](a: A) =
(F.asyncCheckAttempt[A](_ => F.pure(Right(a))) <* F.unit) <-> (F.pure(a))
// format: on

// format: off
def asyncCheckAttemptSuspendedRightIsAsyncRight[A](a: A, fu: F[Unit]) =
(F.asyncCheckAttempt[A](k => F.delay(k(Right(a))) >> fu.as(Left(None))) <* F.unit) <-> (F.async[A](k => F.delay(k(Right(a))) >> fu.as(None)) <* F.unit)
// format: on

// format: off
def asyncCheckAttemptSuspendedLeftIsAsyncLeft[A](e: Throwable, fu: F[Unit]) =
(F.asyncCheckAttempt[A](k => F.delay(k(Left(e))) >> fu.as(Left(None))) <* F.unit) <-> (F.async[A](k => F.delay(k(Left(e))) >> fu.as(None)) <* F.unit)
// format: on

// format: off
def asyncRightIsUncancelableSequencedPure[A](a: A, fu: F[Unit]) =
(F.async[A](k => F.delay(k(Right(a))) >> fu.as(None)) <* F.unit) <-> (F.uncancelable(_ => fu) >> F.pure(a))
Expand Down
6 changes: 6 additions & 0 deletions laws/shared/src/main/scala/cats/effect/laws/AsyncTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ trait AsyncTests[F[_]] extends GenTemporalTests[F, Throwable] with SyncTests[F]
)

val props = Seq(
"asyncCheckAttempt immediate is pure" -> forAll(
laws.asyncCheckAttemptImmediateIsPure[A] _),
"asyncCheckAttempt suspended right is async right" -> forAll(
laws.asyncCheckAttemptSuspendedRightIsAsyncRight[A] _),
"asyncCheckAttempt suspended left is async left" -> forAll(
laws.asyncCheckAttemptSuspendedLeftIsAsyncLeft[A] _),
"async right is uncancelable sequenced pure" -> forAll(
laws.asyncRightIsUncancelableSequencedPure[A] _),
"async left is uncancelable sequenced raiseError" -> forAll(
Expand Down
Loading

0 comments on commit fa3324a

Please sign in to comment.