Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Async#asyncCheckAttempt for #3087 #3091

Merged
merged 7 commits into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 88 additions & 3 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1072,10 +1072,81 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
def defer[A](thunk: => IO[A]): IO[A] =
delay(thunk).flatten

/**
* Suspends an asynchronous side effect with optional immediate result in `IO`.
*
* The given function `k` will be invoked during evaluation of the `IO` to:
* - check if result is already available;
* - "schedule" the asynchronous callback, where the callback of type `Either[Throwable, A]
* \=> Unit` is the parameter passed to that function. Only the ''first'' invocation of
* the callback will be effective! All subsequent invocations will be silently dropped.
*
* The process of registering the callback itself is suspended in `IO` (the outer `IO` of
* `IO[Either[Option[IO[Unit]], A]]`).
*
* The effect returns `Either[Option[IO[Unit]], A]` where:
* - right side `A` is an immediate result of computation (callback invocation will be
* dropped);
* - left side `Option[IO[Unit]] `is an optional finalizer to be run in the event that the
* fiber running `asyncCheckAttempt(k)` is canceled.
*
* For example, here is a simplified version of `IO.fromCompletableFuture`:
*
* {{{
* def fromCompletableFuture[A](fut: IO[CompletableFuture[A]]): IO[A] = {
* fut.flatMap { cf =>
* IO.asyncCheckAttempt { cb =>
* if (cf.isDone) {
* //Register immediately available result of the completable future or handle an error
* IO(cf.get)
* .map(Right(_))
* .handleError { e =>
* cb(Left(e))
* Left(None)
* }
* } else {
* IO {
* //Invoke the callback with the result of the completable future
* val stage = cf.handle[Unit] {
* case (a, null) => cb(Right(a))
* case (_, e) => cb(Left(e))
* }
*
* //Cancel the completable future if the fiber is canceled
* Left(Some(IO(stage.cancel(false)).void))
* }
* }
* }
* }
* }
* }}}
*
* Note that `asyncCheckAttempt` is uncancelable during its registration.
*
* @see
* [[async]] for a simplified variant without an option for immediate result
*/
def asyncCheckAttempt[A](
k: (Either[Throwable, A] => Unit) => IO[Either[Option[IO[Unit]], A]]): IO[A] = {
val body = new Cont[IO, A, A] {
def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) =>
G.uncancelable { poll =>
lift(k(resume)) flatMap {
case Right(a) => G.pure(a)
case Left(Some(fin)) => G.onCancel(poll(get), lift(fin))
case Left(None) => poll(get)
}
}
}
}

IOCont(body, Tracing.calculateTracingEvent(k))
}

/**
* Suspends an asynchronous side effect in `IO`.
*
* The given function will be invoked during evaluation of the `IO` to "schedule" the
* The given function `k` will be invoked during evaluation of the `IO` to "schedule" the
* asynchronous callback, where the callback of type `Either[Throwable, A] => Unit` is the
* parameter passed to that function. Only the ''first'' invocation of the callback will be
* effective! All subsequent invocations will be silently dropped.
Expand Down Expand Up @@ -1107,8 +1178,13 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
* }
* }}}
*
* Note that `async` is uncancelable during its registration.
*
* @see
* [[async_]] for a simplified variant without a finalizer
* @see
* [[asyncCheckAttempt]] for more generic version providing an optional immediate result of
* computation
*/
def async[A](k: (Either[Throwable, A] => Unit) => IO[Option[IO[Unit]]]): IO[A] = {
val body = new Cont[IO, A, A] {
Expand All @@ -1128,7 +1204,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
/**
* Suspends an asynchronous side effect in `IO`.
*
* The given function will be invoked during evaluation of the `IO` to "schedule" the
* The given function `k` will be invoked during evaluation of the `IO` to "schedule" the
* asynchronous callback, where the callback is the parameter passed to that function. Only
* the ''first'' invocation of the callback will be effective! All subsequent invocations will
* be silently dropped.
Expand All @@ -1155,8 +1231,13 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
* This function can be thought of as a safer, lexically-constrained version of `Promise`,
* where `IO` is like a safer, lazy version of `Future`.
*
* Also, note that `async` is uncancelable during its registration.
*
* @see
* [[async]]
* [[async]] for more generic version providing a finalizer
* @see
* [[asyncCheckAttempt]] for more generic version providing an optional immediate result of
* computation and a finalizer
*/
def async_[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = {
val body = new Cont[IO, A, A] {
Expand Down Expand Up @@ -1610,6 +1691,10 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
private[this] val _asyncForIO: kernel.Async[IO] = new kernel.Async[IO]
with StackSafeMonad[IO] {

override def asyncCheckAttempt[A](
k: (Either[Throwable, A] => Unit) => IO[Either[Option[IO[Unit]], A]]): IO[A] =
IO.asyncCheckAttempt(k)

override def async[A](k: (Either[Throwable, A] => Unit) => IO[Option[IO[Unit]]]): IO[A] =
IO.async(k)

Expand Down
87 changes: 70 additions & 17 deletions kernel/shared/src/main/scala/cats/effect/kernel/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,39 @@ import java.util.concurrent.atomic.AtomicReference
trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {

/**
* The asynchronous FFI.
* Suspends an asynchronous side effect with optional immediate result in `F`.
*
* `k` takes a callback of type `Either[Throwable, A] => Unit` to signal the result of the
* asynchronous computation. The execution of `async(k)` is semantically blocked until the
* callback is invoked.
* The given function `k` will be invoked during evaluation of `F` to:
* - check if result is already available;
* - "schedule" the asynchronous callback, where the callback of type `Either[Throwable, A]
* \=> Unit` is the parameter passed to that function. Only the ''first'' invocation of
* the callback will be effective! All subsequent invocations will be silently dropped.
*
* `k` returns an `Option[F[Unit]]` which is an optional finalizer to be run in the event that
* the fiber running `async(k)` is canceled. If passed `k` is `None`, then created effect will
* be uncancelable.
* The process of registering the callback itself is suspended in `F` (the outer `F` of
* `F[Either[Option[G[Unit]], A]]`).
*
* Also, note that `async` is uncancelable during its registration.
* The effect returns `Either[Option[F[Unit]], A]` where:
* - right side `A` is an immediate result of computation (callback invocation will be
* dropped);
* - left side `Option[F[Unit]] `is an optional finalizer to be run in the event that the
* fiber running `asyncCheckAttempt(k)` is canceled.
*
* Also, note that `asyncCheckAttempt` is uncancelable during its registration.
*
* @see
* [[async]] for a simplified variant without an option for immediate result
* @see
* [[async_]] for a simplified variant without an option for immediate result or finalizer
*/
def async[A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]]): F[A] = {
def asyncCheckAttempt[A](
k: (Either[Throwable, A] => Unit) => F[Either[Option[F[Unit]], A]]): F[A] = {
val body = new Cont[F, A, A] {
def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) =>
G.uncancelable { poll =>
lift(k(resume)) flatMap {
case Some(fin) => G.onCancel(poll(get), lift(fin))
case None => poll(get)
case Right(a) => G.pure(a)
case Left(Some(fin)) => G.onCancel(poll(get), lift(fin))
case Left(None) => poll(get)
}
}
}
Expand All @@ -90,9 +104,48 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
}

/**
* A convenience version of [[Async.async]] for when we don't need to perform `F[_]` effects
* or perform finalization in the event of cancelation (i.e. created effect will be
* uncancelable).
* Suspends an asynchronous side effect in `F`.
*
* The given function `k` will be invoked during evaluation of the `F` to "schedule" the
* asynchronous callback, where the callback of type `Either[Throwable, A] => Unit` is the
* parameter passed to that function. Only the ''first'' invocation of the callback will be
* effective! All subsequent invocations will be silently dropped.
*
* The process of registering the callback itself is suspended in `F` (the outer `F` of
* `F[Option[F[Unit]]]`).
*
* The effect returns `Option[F[Unit]]` which is an optional finalizer to be run in the event
* that the fiber running `async(k)` is canceled.
*
* Also, note that `async` is uncancelable during its registration.
*
* @see
* [[async_]] for a simplified variant without a finalizer
* @see
* [[asyncCheckAttempt]] for more generic version with option of providing immediate result
* of computation
*/
def async[A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]]): F[A] =
asyncCheckAttempt[A](cb => map(k(cb))(Left(_)))

/**
* Suspends an asynchronous side effect in `F`.
*
* The given function `k` will be invoked during evaluation of the `F` to "schedule" the
* asynchronous callback, where the callback is the parameter passed to that function. Only
* the ''first'' invocation of the callback will be effective! All subsequent invocations will
* be silently dropped.
*
* This function can be thought of as a safer, lexically-constrained version of `Promise`,
* where `IO` is like a safer, lazy version of `Future`.
*
* Also, note that `async` is uncancelable during its registration.
*
* @see
* [[async]] for more generic version providing a finalizer
* @see
* [[asyncCheckAttempt]] for more generic version with option of providing immediate result
* of computation and finalizer
*/
def async_[A](k: (Either[Throwable, A] => Unit) => Unit): F[A] =
async[A](cb => as(delay(k(cb)), None))
Expand Down Expand Up @@ -183,9 +236,9 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
* NOTE: This is a very low level api, end users should use `async` instead.
* See cats.effect.kernel.Cont for more detail.
*
* If you are an implementor, and you have `async`, `Async.defaultCont`
* provides an implementation of `cont` in terms of `async`.
* Note that if you use `defaultCont` you _have_ to override `async`.
* If you are an implementor, and you have `async` or `asyncCheckAttempt`,
* `Async.defaultCont` provides an implementation of `cont` in terms of `async`.
* Note that if you use `defaultCont` you _have_ to override `async/asyncCheckAttempt`.
*/
def cont[K, R](body: Cont[F, K, R]): F[R]
}
Expand Down
15 changes: 15 additions & 0 deletions laws/shared/src/main/scala/cats/effect/laws/AsyncLaws.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@ import scala.util.{Left, Right}
trait AsyncLaws[F[_]] extends GenTemporalLaws[F, Throwable] with SyncLaws[F] {
implicit val F: Async[F]

// format: off
def asyncCheckAttemptImmediateIsPure[A](a: A) =
(F.asyncCheckAttempt[A](_ => F.pure(Right(a))) <* F.unit) <-> (F.pure(a))
// format: on

// format: off
def asyncCheckAttemptSuspendedRightIsAsyncRight[A](a: A, fu: F[Unit]) =
(F.asyncCheckAttempt[A](k => F.delay(k(Right(a))) >> fu.as(Left(None))) <* F.unit) <-> (F.async[A](k => F.delay(k(Right(a))) >> fu.as(None)) <* F.unit)
// format: on

// format: off
def asyncCheckAttemptSuspendedLeftIsAsyncLeft[A](e: Throwable, fu: F[Unit]) =
(F.asyncCheckAttempt[A](k => F.delay(k(Left(e))) >> fu.as(Left(None))) <* F.unit) <-> (F.async[A](k => F.delay(k(Left(e))) >> fu.as(None)) <* F.unit)
// format: on

// format: off
def asyncRightIsUncancelableSequencedPure[A](a: A, fu: F[Unit]) =
(F.async[A](k => F.delay(k(Right(a))) >> fu.as(None)) <* F.unit) <-> (F.uncancelable(_ => fu) >> F.pure(a))
Expand Down
6 changes: 6 additions & 0 deletions laws/shared/src/main/scala/cats/effect/laws/AsyncTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ trait AsyncTests[F[_]] extends GenTemporalTests[F, Throwable] with SyncTests[F]
)

val props = Seq(
"asyncCheckAttempt immediate is pure" -> forAll(
laws.asyncCheckAttemptImmediateIsPure[A] _),
"asyncCheckAttempt suspended right is async right" -> forAll(
laws.asyncCheckAttemptSuspendedRightIsAsyncRight[A] _),
"asyncCheckAttempt suspended left is async left" -> forAll(
laws.asyncCheckAttemptSuspendedLeftIsAsyncLeft[A] _),
"async right is uncancelable sequenced pure" -> forAll(
laws.asyncRightIsUncancelableSequencedPure[A] _),
"async left is uncancelable sequenced raiseError" -> forAll(
Expand Down
Loading