diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 1fb2522407..6b81ccc365 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -1072,10 +1072,81 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { def defer[A](thunk: => IO[A]): IO[A] = delay(thunk).flatten + /** + * Suspends an asynchronous side effect with optional immediate result in `IO`. + * + * The given function `k` will be invoked during evaluation of the `IO` to: + * - check if result is already available; + * - "schedule" the asynchronous callback, where the callback of type `Either[Throwable, A] + * \=> Unit` is the parameter passed to that function. Only the ''first'' invocation of + * the callback will be effective! All subsequent invocations will be silently dropped. + * + * The process of registering the callback itself is suspended in `IO` (the outer `IO` of + * `IO[Either[Option[IO[Unit]], A]]`). + * + * The effect returns `Either[Option[IO[Unit]], A]` where: + * - right side `A` is an immediate result of computation (callback invocation will be + * dropped); + * - left side `Option[IO[Unit]] `is an optional finalizer to be run in the event that the + * fiber running `asyncCheckAttempt(k)` is canceled. + * + * For example, here is a simplified version of `IO.fromCompletableFuture`: + * + * {{{ + * def fromCompletableFuture[A](fut: IO[CompletableFuture[A]]): IO[A] = { + * fut.flatMap { cf => + * IO.asyncCheckAttempt { cb => + * if (cf.isDone) { + * //Register immediately available result of the completable future or handle an error + * IO(cf.get) + * .map(Right(_)) + * .handleError { e => + * cb(Left(e)) + * Left(None) + * } + * } else { + * IO { + * //Invoke the callback with the result of the completable future + * val stage = cf.handle[Unit] { + * case (a, null) => cb(Right(a)) + * case (_, e) => cb(Left(e)) + * } + * + * //Cancel the completable future if the fiber is canceled + * Left(Some(IO(stage.cancel(false)).void)) + * } + * } + * } + * } + * } + * }}} + * + * Note that `asyncCheckAttempt` is uncancelable during its registration. + * + * @see + * [[async]] for a simplified variant without an option for immediate result + */ + def asyncCheckAttempt[A]( + k: (Either[Throwable, A] => Unit) => IO[Either[Option[IO[Unit]], A]]): IO[A] = { + val body = new Cont[IO, A, A] { + def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) => + G.uncancelable { poll => + lift(k(resume)) flatMap { + case Right(a) => G.pure(a) + case Left(Some(fin)) => G.onCancel(poll(get), lift(fin)) + case Left(None) => poll(get) + } + } + } + } + + IOCont(body, Tracing.calculateTracingEvent(k)) + } + /** * Suspends an asynchronous side effect in `IO`. * - * The given function will be invoked during evaluation of the `IO` to "schedule" the + * The given function `k` will be invoked during evaluation of the `IO` to "schedule" the * asynchronous callback, where the callback of type `Either[Throwable, A] => Unit` is the * parameter passed to that function. Only the ''first'' invocation of the callback will be * effective! All subsequent invocations will be silently dropped. @@ -1107,8 +1178,13 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { * } * }}} * + * Note that `async` is uncancelable during its registration. + * * @see * [[async_]] for a simplified variant without a finalizer + * @see + * [[asyncCheckAttempt]] for more generic version providing an optional immediate result of + * computation */ def async[A](k: (Either[Throwable, A] => Unit) => IO[Option[IO[Unit]]]): IO[A] = { val body = new Cont[IO, A, A] { @@ -1128,7 +1204,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { /** * Suspends an asynchronous side effect in `IO`. * - * The given function will be invoked during evaluation of the `IO` to "schedule" the + * The given function `k` will be invoked during evaluation of the `IO` to "schedule" the * asynchronous callback, where the callback is the parameter passed to that function. Only * the ''first'' invocation of the callback will be effective! All subsequent invocations will * be silently dropped. @@ -1155,8 +1231,13 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { * This function can be thought of as a safer, lexically-constrained version of `Promise`, * where `IO` is like a safer, lazy version of `Future`. * + * Also, note that `async` is uncancelable during its registration. + * * @see - * [[async]] + * [[async]] for more generic version providing a finalizer + * @see + * [[asyncCheckAttempt]] for more generic version providing an optional immediate result of + * computation and a finalizer */ def async_[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = { val body = new Cont[IO, A, A] { @@ -1610,6 +1691,10 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { private[this] val _asyncForIO: kernel.Async[IO] = new kernel.Async[IO] with StackSafeMonad[IO] { + override def asyncCheckAttempt[A]( + k: (Either[Throwable, A] => Unit) => IO[Either[Option[IO[Unit]], A]]): IO[A] = + IO.asyncCheckAttempt(k) + override def async[A](k: (Either[Throwable, A] => Unit) => IO[Option[IO[Unit]]]): IO[A] = IO.async(k) diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala b/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala index 58c60eaab6..fab8e244aa 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala @@ -62,25 +62,39 @@ import java.util.concurrent.atomic.AtomicReference trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] { /** - * The asynchronous FFI. + * Suspends an asynchronous side effect with optional immediate result in `F`. * - * `k` takes a callback of type `Either[Throwable, A] => Unit` to signal the result of the - * asynchronous computation. The execution of `async(k)` is semantically blocked until the - * callback is invoked. + * The given function `k` will be invoked during evaluation of `F` to: + * - check if result is already available; + * - "schedule" the asynchronous callback, where the callback of type `Either[Throwable, A] + * \=> Unit` is the parameter passed to that function. Only the ''first'' invocation of + * the callback will be effective! All subsequent invocations will be silently dropped. * - * `k` returns an `Option[F[Unit]]` which is an optional finalizer to be run in the event that - * the fiber running `async(k)` is canceled. If passed `k` is `None`, then created effect will - * be uncancelable. + * The process of registering the callback itself is suspended in `F` (the outer `F` of + * `F[Either[Option[G[Unit]], A]]`). * - * Also, note that `async` is uncancelable during its registration. + * The effect returns `Either[Option[F[Unit]], A]` where: + * - right side `A` is an immediate result of computation (callback invocation will be + * dropped); + * - left side `Option[F[Unit]] `is an optional finalizer to be run in the event that the + * fiber running `asyncCheckAttempt(k)` is canceled. + * + * Also, note that `asyncCheckAttempt` is uncancelable during its registration. + * + * @see + * [[async]] for a simplified variant without an option for immediate result + * @see + * [[async_]] for a simplified variant without an option for immediate result or finalizer */ - def async[A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]]): F[A] = { + def asyncCheckAttempt[A]( + k: (Either[Throwable, A] => Unit) => F[Either[Option[F[Unit]], A]]): F[A] = { val body = new Cont[F, A, A] { def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) => G.uncancelable { poll => lift(k(resume)) flatMap { - case Some(fin) => G.onCancel(poll(get), lift(fin)) - case None => poll(get) + case Right(a) => G.pure(a) + case Left(Some(fin)) => G.onCancel(poll(get), lift(fin)) + case Left(None) => poll(get) } } } @@ -90,9 +104,48 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] { } /** - * A convenience version of [[Async.async]] for when we don't need to perform `F[_]` effects - * or perform finalization in the event of cancelation (i.e. created effect will be - * uncancelable). + * Suspends an asynchronous side effect in `F`. + * + * The given function `k` will be invoked during evaluation of the `F` to "schedule" the + * asynchronous callback, where the callback of type `Either[Throwable, A] => Unit` is the + * parameter passed to that function. Only the ''first'' invocation of the callback will be + * effective! All subsequent invocations will be silently dropped. + * + * The process of registering the callback itself is suspended in `F` (the outer `F` of + * `F[Option[F[Unit]]]`). + * + * The effect returns `Option[F[Unit]]` which is an optional finalizer to be run in the event + * that the fiber running `async(k)` is canceled. + * + * Also, note that `async` is uncancelable during its registration. + * + * @see + * [[async_]] for a simplified variant without a finalizer + * @see + * [[asyncCheckAttempt]] for more generic version with option of providing immediate result + * of computation + */ + def async[A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]]): F[A] = + asyncCheckAttempt[A](cb => map(k(cb))(Left(_))) + + /** + * Suspends an asynchronous side effect in `F`. + * + * The given function `k` will be invoked during evaluation of the `F` to "schedule" the + * asynchronous callback, where the callback is the parameter passed to that function. Only + * the ''first'' invocation of the callback will be effective! All subsequent invocations will + * be silently dropped. + * + * This function can be thought of as a safer, lexically-constrained version of `Promise`, + * where `IO` is like a safer, lazy version of `Future`. + * + * Also, note that `async` is uncancelable during its registration. + * + * @see + * [[async]] for more generic version providing a finalizer + * @see + * [[asyncCheckAttempt]] for more generic version with option of providing immediate result + * of computation and finalizer */ def async_[A](k: (Either[Throwable, A] => Unit) => Unit): F[A] = async[A](cb => as(delay(k(cb)), None)) @@ -183,9 +236,9 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] { * NOTE: This is a very low level api, end users should use `async` instead. * See cats.effect.kernel.Cont for more detail. * - * If you are an implementor, and you have `async`, `Async.defaultCont` - * provides an implementation of `cont` in terms of `async`. - * Note that if you use `defaultCont` you _have_ to override `async`. + * If you are an implementor, and you have `async` or `asyncCheckAttempt`, + * `Async.defaultCont` provides an implementation of `cont` in terms of `async`. + * Note that if you use `defaultCont` you _have_ to override `async/asyncCheckAttempt`. */ def cont[K, R](body: Cont[F, K, R]): F[R] } diff --git a/laws/shared/src/main/scala/cats/effect/laws/AsyncLaws.scala b/laws/shared/src/main/scala/cats/effect/laws/AsyncLaws.scala index 6b76d0bb50..cfddeba102 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/AsyncLaws.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/AsyncLaws.scala @@ -26,6 +26,21 @@ import scala.util.{Left, Right} trait AsyncLaws[F[_]] extends GenTemporalLaws[F, Throwable] with SyncLaws[F] { implicit val F: Async[F] + // format: off + def asyncCheckAttemptImmediateIsPure[A](a: A) = + (F.asyncCheckAttempt[A](_ => F.pure(Right(a))) <* F.unit) <-> (F.pure(a)) + // format: on + + // format: off + def asyncCheckAttemptSuspendedRightIsAsyncRight[A](a: A, fu: F[Unit]) = + (F.asyncCheckAttempt[A](k => F.delay(k(Right(a))) >> fu.as(Left(None))) <* F.unit) <-> (F.async[A](k => F.delay(k(Right(a))) >> fu.as(None)) <* F.unit) + // format: on + + // format: off + def asyncCheckAttemptSuspendedLeftIsAsyncLeft[A](e: Throwable, fu: F[Unit]) = + (F.asyncCheckAttempt[A](k => F.delay(k(Left(e))) >> fu.as(Left(None))) <* F.unit) <-> (F.async[A](k => F.delay(k(Left(e))) >> fu.as(None)) <* F.unit) + // format: on + // format: off def asyncRightIsUncancelableSequencedPure[A](a: A, fu: F[Unit]) = (F.async[A](k => F.delay(k(Right(a))) >> fu.as(None)) <* F.unit) <-> (F.uncancelable(_ => fu) >> F.pure(a)) diff --git a/laws/shared/src/main/scala/cats/effect/laws/AsyncTests.scala b/laws/shared/src/main/scala/cats/effect/laws/AsyncTests.scala index c0cbe63e09..916846ded1 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/AsyncTests.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/AsyncTests.scala @@ -239,6 +239,12 @@ trait AsyncTests[F[_]] extends GenTemporalTests[F, Throwable] with SyncTests[F] ) val props = Seq( + "asyncCheckAttempt immediate is pure" -> forAll( + laws.asyncCheckAttemptImmediateIsPure[A] _), + "asyncCheckAttempt suspended right is async right" -> forAll( + laws.asyncCheckAttemptSuspendedRightIsAsyncRight[A] _), + "asyncCheckAttempt suspended left is async left" -> forAll( + laws.asyncCheckAttemptSuspendedLeftIsAsyncLeft[A] _), "async right is uncancelable sequenced pure" -> forAll( laws.asyncRightIsUncancelableSequencedPure[A] _), "async left is uncancelable sequenced raiseError" -> forAll( diff --git a/tests/shared/src/test/scala/cats/effect/IOSpec.scala b/tests/shared/src/test/scala/cats/effect/IOSpec.scala index 85f65fb55c..d4395f6b20 100644 --- a/tests/shared/src/test/scala/cats/effect/IOSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/IOSpec.scala @@ -56,6 +56,20 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { i mustEqual 42 } + "preserve monad identity on asyncCheckAttempt immediate result" in ticked { + implicit ticker => + val fa = IO.asyncCheckAttempt[Int](_ => IO(Right(42))) + fa.flatMap(i => IO.pure(i)) must completeAs(42) + fa must completeAs(42) + } + + "preserve monad identity on asyncCheckAttempt suspended result" in ticked { + implicit ticker => + val fa = IO.asyncCheckAttempt[Int](cb => IO(cb(Right(42))).as(Left(None))) + fa.flatMap(i => IO.pure(i)) must completeAs(42) + fa must completeAs(42) + } + "preserve monad identity on async" in ticked { implicit ticker => val fa = IO.async[Int](cb => IO(cb(Right(42))).as(None)) fa.flatMap(i => IO.pure(i)) must completeAs(42) @@ -69,6 +83,12 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { IO(throw TestException) must failAs(TestException) } + "resume error continuation within asyncCheckAttempt" in ticked { implicit ticker => + case object TestException extends RuntimeException + IO.asyncCheckAttempt[Unit](k => IO(k(Left(TestException))).as(Left(None))) must failAs( + TestException) + } + "resume error continuation within async" in ticked { implicit ticker => case object TestException extends RuntimeException IO.async[Unit](k => IO(k(Left(TestException))).as(None)) must failAs(TestException) @@ -411,6 +431,164 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { } + "asyncCheckAttempt" should { + + "resume value continuation within asyncCheckAttempt with immediate result" in ticked { + implicit ticker => IO.asyncCheckAttempt[Int](_ => IO(Right(42))) must completeAs(42) + } + + "resume value continuation within asyncCheckAttempt with suspended result" in ticked { + implicit ticker => + IO.asyncCheckAttempt[Int](k => IO(k(Right(42))).as(Left(None))) must completeAs(42) + } + + "continue from the results of an asyncCheckAttempt immediate result produced prior to registration" in ticked { + implicit ticker => + val fa = IO.asyncCheckAttempt[Int](_ => IO(Right(42))).map(_ + 2) + fa must completeAs(44) + } + + "continue from the results of an asyncCheckAttempt suspended result produced prior to registration" in ticked { + implicit ticker => + val fa = IO.asyncCheckAttempt[Int](cb => IO(cb(Right(42))).as(Left(None))).map(_ + 2) + fa must completeAs(44) + } + + // format: off + "produce a failure when the registration raises an error after result" in ticked { implicit ticker => + case object TestException extends RuntimeException + + IO.asyncCheckAttempt[Int](_ => IO(Right(42)) + .flatMap(_ => IO.raiseError(TestException))) + .void must failAs(TestException) + } + // format: on + + // format: off + "produce a failure when the registration raises an error after callback" in ticked { implicit ticker => + case object TestException extends RuntimeException + + val fa = IO.asyncCheckAttempt[Int](cb => IO(cb(Right(42))) + .flatMap(_ => IO.raiseError(TestException))) + .void + fa must failAs(TestException) + } + // format: on + + "ignore asyncCheckAttempt callback" in ticked { implicit ticker => + case object TestException extends RuntimeException + + var cb: Either[Throwable, Int] => Unit = null + + val asyncCheckAttempt = IO.asyncCheckAttempt[Int] { cb0 => + IO { cb = cb0 } *> IO.pure(Right(42)) + } + + val test = for { + fiber <- asyncCheckAttempt.start + _ <- IO(ticker.ctx.tick()) + _ <- IO(cb(Right(43))) + _ <- IO(ticker.ctx.tick()) + _ <- IO(cb(Left(TestException))) + _ <- IO(ticker.ctx.tick()) + value <- fiber.joinWithNever + } yield value + + test must completeAs(42) + } + + "ignore asyncCheckAttempt callback real" in real { + case object TestException extends RuntimeException + + var cb: Either[Throwable, Int] => Unit = null + + val test = for { + latch1 <- Deferred[IO, Unit] + latch2 <- Deferred[IO, Unit] + fiber <- + IO.asyncCheckAttempt[Int] { cb0 => + IO { cb = cb0 } *> latch1.complete(()) *> latch2.get *> IO.pure(Right(42)) + }.start + _ <- latch1.get + _ <- IO(cb(Right(43))) + _ <- IO(cb(Left(TestException))) + _ <- latch2.complete(()) + value <- fiber.joinWithNever + } yield value + + test.attempt.flatMap { n => IO(n mustEqual Right(42)) } + } + + "repeated asyncCheckAttempt callback" in ticked { implicit ticker => + case object TestException extends RuntimeException + + var cb: Either[Throwable, Int] => Unit = null + + val asyncCheckAttempt = IO.asyncCheckAttempt[Int] { cb0 => + IO { cb = cb0 } *> IO.pure(Left(None)) + } + + val test = for { + fiber <- asyncCheckAttempt.start + _ <- IO(ticker.ctx.tick()) + _ <- IO(cb(Right(42))) + _ <- IO(ticker.ctx.tick()) + _ <- IO(cb(Right(43))) + _ <- IO(ticker.ctx.tick()) + _ <- IO(cb(Left(TestException))) + _ <- IO(ticker.ctx.tick()) + value <- fiber.joinWithNever + } yield value + + test must completeAs(42) + } + + "repeated asyncCheckAttempt callback real" in real { + case object TestException extends RuntimeException + + var cb: Either[Throwable, Int] => Unit = null + + val test = for { + latch1 <- Deferred[IO, Unit] + latch2 <- Deferred[IO, Unit] + fiber <- + IO.asyncCheckAttempt[Int] { cb0 => + IO { cb = cb0 } *> latch1.complete(()) *> latch2.get *> IO.pure(Left(None)) + }.start + _ <- latch1.get + _ <- IO(cb(Right(42))) + _ <- IO(cb(Right(43))) + _ <- IO(cb(Left(TestException))) + _ <- latch2.complete(()) + value <- fiber.joinWithNever + } yield value + + test.attempt.flatMap { n => IO(n mustEqual Right(42)) } + } + + "allow for misordered nesting" in ticked { implicit ticker => + var outerR = 0 + var innerR = 0 + + val outer = IO.asyncCheckAttempt[Int] { cb1 => + val inner = IO.asyncCheckAttempt[Int] { cb2 => + IO(cb1(Right(1))) *> + IO.executionContext + .flatMap(ec => IO(ec.execute(() => cb2(Right(2))))) + .as(Left(None)) + } + + inner.flatMap(i => IO { innerR = i }).as(Left(None)) + } + + val test = outer.flatMap(i => IO { outerR = i }) + + test must completeAs(()) + outerR mustEqual 1 + innerR mustEqual 2 + } + } + "async" should { "resume value continuation within async" in ticked { implicit ticker => diff --git a/tests/shared/src/test/scala/cats/effect/kernel/AsyncSpec.scala b/tests/shared/src/test/scala/cats/effect/kernel/AsyncSpec.scala new file mode 100644 index 0000000000..a2225e54de --- /dev/null +++ b/tests/shared/src/test/scala/cats/effect/kernel/AsyncSpec.scala @@ -0,0 +1,149 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package kernel + +import cats.{Eq, Order, StackSafeMonad} +import cats.arrow.FunctionK +import cats.effect.laws.AsyncTests +import cats.laws.discipline.arbitrary._ + +import org.scalacheck.{Arbitrary, Cogen, Prop} +import org.scalacheck.Arbitrary.arbitrary +import org.typelevel.discipline.specs2.mutable.Discipline + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ + +class AsyncSpec extends BaseSpec with Discipline { + + // we just need this because of the laws testing, since the prop runs can interfere with each other + sequential + + { + implicit val ticker = Ticker() + + checkAll( + "AsyncIO", + AsyncTests[AsyncIO].async[Int, Int, Int](10.millis) + ) /*(Parameters(seed = Some(Seed.fromBase64("ZxDXpm7_3Pdkl-Fvt8M90Cxfam9wKuzcifQ1QsIJxND=").get)))*/ + } + + final class AsyncIO[A](val io: IO[A]) + + implicit + def asyncForAsyncIO: Async[AsyncIO] = new Async[AsyncIO] with StackSafeMonad[AsyncIO] { + def pure[A](x: A): AsyncIO[A] = liftIO(IO.pure(x)) + def raiseError[A](e: Throwable): AsyncIO[A] = liftIO(IO.raiseError(e)) + def suspend[A](hint: Sync.Type)(thunk: => A): AsyncIO[A] = liftIO(IO.suspend(hint)(thunk)) + + def canceled: AsyncIO[Unit] = liftIO(IO.canceled) + def cede: AsyncIO[Unit] = liftIO(IO.cede) + def executionContext: AsyncIO[ExecutionContext] = liftIO(IO.executionContext) + + def monotonic: AsyncIO[FiniteDuration] = liftIO(IO.monotonic) + def realTime: AsyncIO[FiniteDuration] = liftIO(IO.realTime) + + def ref[A](a: A): AsyncIO[Ref[AsyncIO, A]] = delay(Ref.unsafe(a)(this)) + def deferred[A]: AsyncIO[Deferred[AsyncIO, A]] = delay(Deferred.unsafe(this)) + + def evalOn[A](fa: AsyncIO[A], ec: ExecutionContext): AsyncIO[A] = wrapIO(fa)(_.evalOn(ec)) + + def flatMap[A, B](fa: AsyncIO[A])(f: A => AsyncIO[B]): AsyncIO[B] = + wrapIO(fa)(_.flatMap(f(_).io)) + + def forceR[A, B](fa: AsyncIO[A])(fb: AsyncIO[B]): AsyncIO[B] = wrapIO(fa)(_.forceR(fb.io)) + + def handleErrorWith[A](fa: AsyncIO[A])(f: Throwable => AsyncIO[A]): AsyncIO[A] = + wrapIO(fa)(_.handleErrorWith(f(_).io)) + + def onCancel[A](fa: AsyncIO[A], fin: AsyncIO[Unit]): AsyncIO[A] = + wrapIO(fa)(_.onCancel(fin.io)) + + def sleep(time: FiniteDuration): AsyncIO[Unit] = liftIO(IO.sleep(time)) + + def cont[K, R](body: Cont[AsyncIO, K, R]): AsyncIO[R] = { + val lower: FunctionK[AsyncIO, IO] = new FunctionK[AsyncIO, IO] { + def apply[A](fa: AsyncIO[A]): IO[A] = fa.io + } + + val ioCont = new Cont[IO, K, R] { + def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) => + body.apply[G].apply(resume, get, lower.andThen(lift)) + } + } + + liftIO(IO.cont(ioCont)) + } + + def start[A](fa: AsyncIO[A]): AsyncIO[Fiber[AsyncIO, Throwable, A]] = + liftIO(fa.io.start.map(liftFiber)) + + def uncancelable[A](body: Poll[AsyncIO] => AsyncIO[A]): AsyncIO[A] = + liftIO { + IO.uncancelable { nat => + val natT = new Poll[AsyncIO] { + def apply[B](aio: AsyncIO[B]): AsyncIO[B] = wrapIO(aio)(nat(_)) + } + body(natT).io + } + } + + private val liftIO: FunctionK[IO, AsyncIO] = new FunctionK[IO, AsyncIO] { + def apply[A](fa: IO[A]): AsyncIO[A] = new AsyncIO(fa) + } + + private def liftOutcome[A](oc: Outcome[IO, Throwable, A]): Outcome[AsyncIO, Throwable, A] = + oc match { + case Outcome.Canceled() => Outcome.Canceled() + case Outcome.Errored(e) => Outcome.Errored(e) + case Outcome.Succeeded(foa) => Outcome.Succeeded(liftIO(foa)) + } + + private def liftFiber[A](fib: FiberIO[A]): Fiber[AsyncIO, Throwable, A] = + new Fiber[AsyncIO, Throwable, A] { + def cancel: AsyncIO[Unit] = liftIO(fib.cancel) + def join: AsyncIO[Outcome[AsyncIO, Throwable, A]] = liftIO(fib.join.map(liftOutcome)) + } + + private def wrapIO[A, B](aio: AsyncIO[A])(f: IO[A] => IO[B]): AsyncIO[B] = + liftIO(f(aio.io)) + + } + + implicit def arbitraryAsyncIO[A](implicit arbIO: Arbitrary[IO[A]]): Arbitrary[AsyncIO[A]] = + Arbitrary(arbitrary[IO[A]].map(new AsyncIO(_))) + + implicit def cogenOutcomeAsyncIO[A]( + implicit + cogenOutcomeIO: Cogen[Outcome[IO, Throwable, A]]): Cogen[Outcome[AsyncIO, Throwable, A]] = + cogenOutcomeIO.contramap { + case Outcome.Canceled() => Outcome.Canceled() + case Outcome.Errored(e) => Outcome.Errored(e) + case Outcome.Succeeded(aio) => Outcome.Succeeded(aio.io) + } + + implicit def eqAsyncIO[A](implicit eqIO: Eq[IO[A]]): Eq[AsyncIO[A]] = (x, y) => + eqIO.eqv(x.io, y.io) + + implicit def orderAsyncIO[A](implicit orderIO: Order[IO[A]]): Order[AsyncIO[A]] = (x, y) => + orderIO.compare(x.io, y.io) + + implicit def execAsyncIO[A](implicit execIO: IO[A] => Prop): AsyncIO[A] => Prop = aio => + execIO(aio.io) + +}