From 732603eddec44409b55dc13b54be94228fc9debc Mon Sep 17 00:00:00 2001 From: Alexey Shuksto Date: Mon, 11 Jul 2022 21:10:41 +0300 Subject: [PATCH 1/6] Add `Async#asyncPoll` for #3087 --- .../src/main/scala/cats/effect/IO.scala | 75 ++++++++ .../main/scala/cats/effect/kernel/Async.scala | 34 +++- .../src/test/scala/cats/effect/IOSpec.scala | 165 ++++++++++++++++++ 3 files changed, 266 insertions(+), 8 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index c7cb95fb4c..7df441f35a 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -1064,6 +1064,76 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { def defer[A](thunk: => IO[A]): IO[A] = delay(thunk).flatten + /** + * Suspends an asynchronous side effect with optional immediate result in `IO`. + * + * The given function will be invoked during evaluation of the `IO` to: + * - check if result is already available; + * - "schedule" the asynchronous callback, where the callback of type + * `Either[Throwable, A] => Unit` is the parameter passed to that function. + * Only the ''first'' invocation of the callback will be effective! + * All subsequent invocations will be silently dropped. + * + * The process of registering the callback itself is suspended in `IO` (the outer `IO` of + * `IO[Either[Option[IO[Unit]], A]]`). + * + * The effect returns `Either[Option[IO[Unit]], A]` where: + * - right side `A` is an immediate result of computation (callback invocation will be + * dropped); + * - left side `Option[IO[Unit]] `is an optional finalizer to be run in the event that the + * fiber running `async(k)` is canceled. + * + * For example, here is a simplified version of `IO.fromCompletableFuture`: + * + * {{{ + * def fromCompletableFuture[A](fut: IO[CompletableFuture[A]]): IO[A] = { + * fut.flatMap { cf => + * IO.asyncPoll { cb => + * if (cf.isDone) { + * //Register immediately available result of the completable future or handle an error + * IO(cf.get) + * .map(Right(_)) + * .handleError { e => + * cb(Left(e)) + * Left(None) + * } + * } else { + * IO { + * //Invoke the callback with the result of the completable future + * val stage = cf.handle[Unit] { + * case (a, null) => cb(Right(a)) + * case (_, e) => cb(Left(e)) + * } + * + * //Cancel the completable future if the fiber is canceled + * Left(Some(IO(stage.cancel(false)).void)) + * } + * } + * } + * } + * } + * }}} + * + * @see + * [[async]] for a simplified variant without an option for immediate result + */ + def asyncPoll[A]( + k: (Either[Throwable, A] => Unit) => IO[Either[Option[IO[Unit]], A]]): IO[A] = { + val body = new Cont[IO, A, A] { + def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) => + G.uncancelable { poll => + lift(k(resume)) flatMap { + case Right(a) => G.pure(a) + case Left(Some(fin)) => G.onCancel(poll(get), lift(fin)) + case Left(None) => poll(get) + } + } + } + } + + IOCont(body, Tracing.calculateTracingEvent(k)) + } + /** * Suspends an asynchronous side effect in `IO`. * @@ -1149,6 +1219,8 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { * * @see * [[async]] + * @see + * [[asyncPoll]] */ def async_[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = { val body = new Cont[IO, A, A] { @@ -1599,6 +1671,9 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { private[this] val _asyncForIO: kernel.Async[IO] = new kernel.Async[IO] with StackSafeMonad[IO] { + override def asyncPoll[A](k: (Either[Throwable, A] => Unit) => IO[Either[Option[IO[Unit]], A]]): IO[A] = + IO.asyncPoll(k) + override def async[A](k: (Either[Throwable, A] => Unit) => IO[Option[IO[Unit]]]): IO[A] = IO.async(k) diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala b/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala index ae60aa71ac..7d02a4c711 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala @@ -62,22 +62,27 @@ import java.util.concurrent.atomic.AtomicReference trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] { /** - * The asynchronous FFI. + * The asynchronous FFI with option for immediate result. * * `k` takes a callback of type `Either[Throwable, A] => Unit` to signal the result of the - * asynchronous computation. The execution of `async(k)` is semantically blocked until the - * callback is invoked. + * asynchronous computation. * - * `k` returns an `Option[F[Unit]]` which is an optional finalizer to be run in the event that - * the fiber running {{{async(k)}}} is canceled. + * `k` returns an `Either[Option[F[Unit]], A]` where: + * - right side `A` signals immediately available result; + * - left side `Option[F[Unit]]` is an optional finalizer to be run in the event that + * the fiber running `async(k)` is canceled. + * + * In case where `k` returns left side the execution of `async(k)` is semantically blocked + * until the callback is invoked. In case of a right side callback invocation is silently dropped. */ - def async[A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]]): F[A] = { + def asyncPoll[A](k: (Either[Throwable, A] => Unit) => F[Either[Option[F[Unit]], A]]): F[A] = { val body = new Cont[F, A, A] { def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) => G.uncancelable { poll => lift(k(resume)) flatMap { - case Some(fin) => G.onCancel(poll(get), lift(fin)) - case None => poll(get) + case Right(a) => G.pure(a) + case Left(Some(fin)) => G.onCancel(poll(get), lift(fin)) + case Left(None) => poll(get) } } } @@ -86,6 +91,19 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] { cont(body) } + /** + * The asynchronous FFI. + * + * `k` takes a callback of type `Either[Throwable, A] => Unit` to signal the result of the + * asynchronous computation. The execution of `async(k)` is semantically blocked until the + * callback is invoked. + * + * `k` returns an `Option[F[Unit]]` which is an optional finalizer to be run in the event that + * the fiber running {{{async(k)}}} is canceled. + */ + def async[A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]]): F[A] = + asyncPoll[A](cb => map(k(cb))(Left(_))) + /** * A convenience version of [[Async.async]] for when we don't need to perform `F[_]` effects * or perform finalization in the event of cancelation. diff --git a/tests/shared/src/test/scala/cats/effect/IOSpec.scala b/tests/shared/src/test/scala/cats/effect/IOSpec.scala index fcd19ca34f..bf2bc11f1a 100644 --- a/tests/shared/src/test/scala/cats/effect/IOSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/IOSpec.scala @@ -56,6 +56,18 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { i mustEqual 42 } + "preserve monad identity on asyncPoll immediate result" in ticked { implicit ticker => + val fa = IO.asyncPoll[Int](_ => IO(Right(42))) + fa.flatMap(i => IO.pure(i)) must completeAs(42) + fa must completeAs(42) + } + + "preserve monad identity on asyncPoll suspended result" in ticked { implicit ticker => + val fa = IO.asyncPoll[Int](cb => IO(cb(Right(42))).as(Left(None))) + fa.flatMap(i => IO.pure(i)) must completeAs(42) + fa must completeAs(42) + } + "preserve monad identity on async" in ticked { implicit ticker => val fa = IO.async[Int](cb => IO(cb(Right(42))).as(None)) fa.flatMap(i => IO.pure(i)) must completeAs(42) @@ -69,6 +81,11 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { IO(throw TestException) must failAs(TestException) } + "resume error continuation within asyncPoll" in ticked { implicit ticker => + case object TestException extends RuntimeException + IO.asyncPoll[Unit](k => IO(k(Left(TestException))).as(Left(None))) must failAs(TestException) + } + "resume error continuation within async" in ticked { implicit ticker => case object TestException extends RuntimeException IO.async[Unit](k => IO(k(Left(TestException))).as(None)) must failAs(TestException) @@ -411,6 +428,154 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { } + "asyncPoll" should { + + "resume value continuation within asyncPoll with immediate result" in ticked { implicit ticker => + IO.asyncPoll[Int](_ => IO(Right(42))) must completeAs(42) + } + + "resume value continuation within asyncPoll with suspended result" in ticked { implicit ticker => + IO.asyncPoll[Int](k => IO(k(Right(42))).as(Left(None))) must completeAs(42) + } + + "continue from the results of an asyncPoll immediate result produced prior to registration" in ticked { + implicit ticker => + IO.asyncPoll[Int](_ => IO(Right(42))).map(_ + 2) must completeAs(44) + } + + "continue from the results of an asyncPoll suspended result produced prior to registration" in ticked { + implicit ticker => + IO.asyncPoll[Int](cb => IO(cb(Right(42))).as(Left(None))).map(_ + 2) must completeAs(44) + } + + // format: off + "produce a failure when the registration raises an error after result" in ticked { implicit ticker => + case object TestException extends RuntimeException + + IO.asyncPoll[Int](_ => IO(Right(42)) + .flatMap(_ => IO.raiseError(TestException))) + .void must failAs(TestException) + } + // format: on + + // format: off + "produce a failure when the registration raises an error after callback" in ticked { implicit ticker => + case object TestException extends RuntimeException + + IO.asyncPoll[Int](cb => IO(cb(Right(42))) + .flatMap(_ => IO.raiseError(TestException))) + .void must failAs(TestException) + } + // format: on + + "ignore asyncPoll callback" in ticked { implicit ticker => + case object TestException extends RuntimeException + + var cb: Either[Throwable, Int] => Unit = null + + val asyncPoll = IO.asyncPoll[Int] { cb0 => IO { cb = cb0 } *> IO.pure(Right(42)) } + + val test = for { + fiber <- asyncPoll.start + _ <- IO(ticker.ctx.tick()) + _ <- IO(cb(Right(43))) + _ <- IO(ticker.ctx.tick()) + _ <- IO(cb(Left(TestException))) + _ <- IO(ticker.ctx.tick()) + value <- fiber.joinWithNever + } yield value + + test must completeAs(42) + } + + "ignore asyncPoll callback real" in real { + case object TestException extends RuntimeException + + var cb: Either[Throwable, Int] => Unit = null + + val test = for { + latch1 <- Deferred[IO, Unit] + latch2 <- Deferred[IO, Unit] + fiber <- + IO.asyncPoll[Int] { cb0 => + IO { cb = cb0 } *> latch1.complete(()) *> latch2.get *> IO.pure(Right(42)) + }.start + _ <- latch1.get + _ <- IO(cb(Right(43))) + _ <- IO(cb(Left(TestException))) + _ <- latch2.complete(()) + value <- fiber.joinWithNever + } yield value + + test.attempt.flatMap { n => IO(n mustEqual Right(42)) } + } + + "repeated asyncPoll callback" in ticked { implicit ticker => + case object TestException extends RuntimeException + + var cb: Either[Throwable, Int] => Unit = null + + val asyncPoll = IO.asyncPoll[Int] { cb0 => IO { cb = cb0} *> IO.pure(Left(None)) } + + val test = for { + fiber <- asyncPoll.start + _ <- IO(ticker.ctx.tick()) + _ <- IO(cb(Right(42))) + _ <- IO(ticker.ctx.tick()) + _ <- IO(cb(Right(43))) + _ <- IO(ticker.ctx.tick()) + _ <- IO(cb(Left(TestException))) + _ <- IO(ticker.ctx.tick()) + value <- fiber.joinWithNever + } yield value + + test must completeAs(42) + } + + "repeated asyncPoll callback real" in real { + case object TestException extends RuntimeException + + var cb: Either[Throwable, Int] => Unit = null + + val test = for { + latch1 <- Deferred[IO, Unit] + latch2 <- Deferred[IO, Unit] + fiber <- + IO.asyncPoll[Int] { cb0 => + IO { cb = cb0 } *> latch1.complete(()) *> latch2.get *> IO.pure(Left(None)) + }.start + _ <- latch1.get + _ <- IO(cb(Right(42))) + _ <- IO(cb(Right(43))) + _ <- IO(cb(Left(TestException))) + _ <- latch2.complete(()) + value <- fiber.joinWithNever + } yield value + + test.attempt.flatMap { n => IO(n mustEqual Right(42)) } + } + + "allow for misordered nesting" in ticked { implicit ticker => + var outerR = 0 + var innerR = 0 + + val outer = IO.asyncPoll[Int] { cb1 => + val inner = IO.asyncPoll[Int] { cb2 => + IO(cb1(Right(1))) *> + IO.executionContext.flatMap(ec => IO(ec.execute(() => cb2(Right(2))))).as(Left(None)) + } + + inner.flatMap(i => IO { innerR = i }).as(Left(None)) + } + + val test = outer.flatMap(i => IO { outerR = i }) + + test must completeAs(()) + outerR mustEqual 1 + innerR mustEqual 2 + } + } + "async" should { "resume value continuation within async" in ticked { implicit ticker => From 52c08c998ed67cffbf866c79bda64876713e7afb Mon Sep 17 00:00:00 2001 From: Alexey Shuksto Date: Mon, 11 Jul 2022 22:40:01 +0300 Subject: [PATCH 2/6] Format sources with scalafmt --- .../src/main/scala/cats/effect/IO.scala | 12 +++++----- .../main/scala/cats/effect/kernel/Async.scala | 9 +++---- .../src/test/scala/cats/effect/IOSpec.scala | 24 +++++++++++-------- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 7df441f35a..8828d10d4b 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -1068,11 +1068,10 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { * Suspends an asynchronous side effect with optional immediate result in `IO`. * * The given function will be invoked during evaluation of the `IO` to: - * - check if result is already available; - * - "schedule" the asynchronous callback, where the callback of type - * `Either[Throwable, A] => Unit` is the parameter passed to that function. - * Only the ''first'' invocation of the callback will be effective! - * All subsequent invocations will be silently dropped. + * - check if result is already available; + * - "schedule" the asynchronous callback, where the callback of type `Either[Throwable, A] + * \=> Unit` is the parameter passed to that function. Only the ''first'' invocation of + * the callback will be effective! All subsequent invocations will be silently dropped. * * The process of registering the callback itself is suspended in `IO` (the outer `IO` of * `IO[Either[Option[IO[Unit]], A]]`). @@ -1671,7 +1670,8 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { private[this] val _asyncForIO: kernel.Async[IO] = new kernel.Async[IO] with StackSafeMonad[IO] { - override def asyncPoll[A](k: (Either[Throwable, A] => Unit) => IO[Either[Option[IO[Unit]], A]]): IO[A] = + override def asyncPoll[A]( + k: (Either[Throwable, A] => Unit) => IO[Either[Option[IO[Unit]], A]]): IO[A] = IO.asyncPoll(k) override def async[A](k: (Either[Throwable, A] => Unit) => IO[Option[IO[Unit]]]): IO[A] = diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala b/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala index 7d02a4c711..1307d50c47 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala @@ -69,11 +69,12 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] { * * `k` returns an `Either[Option[F[Unit]], A]` where: * - right side `A` signals immediately available result; - * - left side `Option[F[Unit]]` is an optional finalizer to be run in the event that - * the fiber running `async(k)` is canceled. + * - left side `Option[F[Unit]]` is an optional finalizer to be run in the event that the + * fiber running `async(k)` is canceled. * - * In case where `k` returns left side the execution of `async(k)` is semantically blocked - * until the callback is invoked. In case of a right side callback invocation is silently dropped. + * In case where `k` returns left side the execution of `async(k)` is semantically blocked + * until the callback is invoked. In case of a right side callback invocation is silently + * dropped. */ def asyncPoll[A](k: (Either[Throwable, A] => Unit) => F[Either[Option[F[Unit]], A]]): F[A] = { val body = new Cont[F, A, A] { diff --git a/tests/shared/src/test/scala/cats/effect/IOSpec.scala b/tests/shared/src/test/scala/cats/effect/IOSpec.scala index bf2bc11f1a..f61b121c8d 100644 --- a/tests/shared/src/test/scala/cats/effect/IOSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/IOSpec.scala @@ -83,7 +83,8 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { "resume error continuation within asyncPoll" in ticked { implicit ticker => case object TestException extends RuntimeException - IO.asyncPoll[Unit](k => IO(k(Left(TestException))).as(Left(None))) must failAs(TestException) + IO.asyncPoll[Unit](k => IO(k(Left(TestException))).as(Left(None))) must failAs( + TestException) } "resume error continuation within async" in ticked { implicit ticker => @@ -430,22 +431,23 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { "asyncPoll" should { - "resume value continuation within asyncPoll with immediate result" in ticked { implicit ticker => - IO.asyncPoll[Int](_ => IO(Right(42))) must completeAs(42) + "resume value continuation within asyncPoll with immediate result" in ticked { + implicit ticker => IO.asyncPoll[Int](_ => IO(Right(42))) must completeAs(42) } - "resume value continuation within asyncPoll with suspended result" in ticked { implicit ticker => - IO.asyncPoll[Int](k => IO(k(Right(42))).as(Left(None))) must completeAs(42) + "resume value continuation within asyncPoll with suspended result" in ticked { + implicit ticker => + IO.asyncPoll[Int](k => IO(k(Right(42))).as(Left(None))) must completeAs(42) } "continue from the results of an asyncPoll immediate result produced prior to registration" in ticked { - implicit ticker => - IO.asyncPoll[Int](_ => IO(Right(42))).map(_ + 2) must completeAs(44) + implicit ticker => IO.asyncPoll[Int](_ => IO(Right(42))).map(_ + 2) must completeAs(44) } "continue from the results of an asyncPoll suspended result produced prior to registration" in ticked { implicit ticker => - IO.asyncPoll[Int](cb => IO(cb(Right(42))).as(Left(None))).map(_ + 2) must completeAs(44) + IO.asyncPoll[Int](cb => IO(cb(Right(42))).as(Left(None))).map(_ + 2) must completeAs( + 44) } // format: off @@ -515,7 +517,7 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { var cb: Either[Throwable, Int] => Unit = null - val asyncPoll = IO.asyncPoll[Int] { cb0 => IO { cb = cb0} *> IO.pure(Left(None)) } + val asyncPoll = IO.asyncPoll[Int] { cb0 => IO { cb = cb0 } *> IO.pure(Left(None)) } val test = for { fiber <- asyncPoll.start @@ -562,7 +564,9 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { val outer = IO.asyncPoll[Int] { cb1 => val inner = IO.asyncPoll[Int] { cb2 => IO(cb1(Right(1))) *> - IO.executionContext.flatMap(ec => IO(ec.execute(() => cb2(Right(2))))).as(Left(None)) + IO.executionContext + .flatMap(ec => IO(ec.execute(() => cb2(Right(2))))) + .as(Left(None)) } inner.flatMap(i => IO { innerR = i }).as(Left(None)) From 673f7f5ecf492fb984955d49a5866a0b0de85b5e Mon Sep 17 00:00:00 2001 From: Alexey Shuksto Date: Wed, 13 Jul 2022 18:20:39 +0300 Subject: [PATCH 3/6] Add three more AsyncLaws tests and spec for basic Async implementation --- .../main/scala/cats/effect/kernel/Async.scala | 6 +- .../scala/cats/effect/laws/AsyncLaws.scala | 15 ++ .../scala/cats/effect/laws/AsyncTests.scala | 5 + .../scala/cats/effect/kernel/AsyncSpec.scala | 148 ++++++++++++++++++ 4 files changed, 171 insertions(+), 3 deletions(-) create mode 100644 tests/shared/src/test/scala/cats/effect/kernel/AsyncSpec.scala diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala b/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala index 1307d50c47..f5d33976af 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala @@ -198,9 +198,9 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] { * NOTE: This is a very low level api, end users should use `async` instead. * See cats.effect.kernel.Cont for more detail. * - * If you are an implementor, and you have `async`, `Async.defaultCont` - * provides an implementation of `cont` in terms of `async`. - * Note that if you use `defaultCont` you _have_ to override `async`. + * If you are an implementor, and you have `async` or `asyncPoll`, + * `Async.defaultCont` provides an implementation of `cont` in terms of `async`. + * Note that if you use `defaultCont` you _have_ to override `async/asyncPoll`. */ def cont[K, R](body: Cont[F, K, R]): F[R] } diff --git a/laws/shared/src/main/scala/cats/effect/laws/AsyncLaws.scala b/laws/shared/src/main/scala/cats/effect/laws/AsyncLaws.scala index 6b76d0bb50..e4586ecb25 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/AsyncLaws.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/AsyncLaws.scala @@ -26,6 +26,21 @@ import scala.util.{Left, Right} trait AsyncLaws[F[_]] extends GenTemporalLaws[F, Throwable] with SyncLaws[F] { implicit val F: Async[F] + // format: off + def asyncPollImmediateIsPure[A](a: A) = + (F.asyncPoll[A](_ => F.pure(Right(a))) <* F.unit) <-> (F.pure(a)) + // format: on + + // format: off + def asyncPollSuspendedRightIsAsyncRight[A](a: A, fu: F[Unit]) = + (F.asyncPoll[A](k => F.delay(k(Right(a))) >> fu.as(Left(None))) <* F.unit) <-> (F.async[A](k => F.delay(k(Right(a))) >> fu.as(None)) <* F.unit) + // format: on + + // format: off + def asyncPollSuspendedLeftIsAsyncLeft[A](e: Throwable, fu: F[Unit]) = + (F.asyncPoll[A](k => F.delay(k(Left(e))) >> fu.as(Left(None))) <* F.unit) <-> (F.async[A](k => F.delay(k(Left(e))) >> fu.as(None)) <* F.unit) + // format: on + // format: off def asyncRightIsUncancelableSequencedPure[A](a: A, fu: F[Unit]) = (F.async[A](k => F.delay(k(Right(a))) >> fu.as(None)) <* F.unit) <-> (F.uncancelable(_ => fu) >> F.pure(a)) diff --git a/laws/shared/src/main/scala/cats/effect/laws/AsyncTests.scala b/laws/shared/src/main/scala/cats/effect/laws/AsyncTests.scala index c0cbe63e09..07d3538c5f 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/AsyncTests.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/AsyncTests.scala @@ -239,6 +239,11 @@ trait AsyncTests[F[_]] extends GenTemporalTests[F, Throwable] with SyncTests[F] ) val props = Seq( + "asyncPoll immediate is pure" -> forAll(laws.asyncPollImmediateIsPure[A] _), + "asyncPoll suspended right is async right" -> forAll( + laws.asyncPollSuspendedRightIsAsyncRight[A] _), + "asyncPoll suspended left is async left" -> forAll( + laws.asyncPollSuspendedLeftIsAsyncLeft[A] _), "async right is uncancelable sequenced pure" -> forAll( laws.asyncRightIsUncancelableSequencedPure[A] _), "async left is uncancelable sequenced raiseError" -> forAll( diff --git a/tests/shared/src/test/scala/cats/effect/kernel/AsyncSpec.scala b/tests/shared/src/test/scala/cats/effect/kernel/AsyncSpec.scala new file mode 100644 index 0000000000..968b23d043 --- /dev/null +++ b/tests/shared/src/test/scala/cats/effect/kernel/AsyncSpec.scala @@ -0,0 +1,148 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package kernel + +import cats.{Eq, Order, StackSafeMonad} +import cats.arrow.FunctionK +import cats.effect.laws.AsyncTests +import cats.laws.discipline.arbitrary._ +import org.scalacheck.{Arbitrary, Cogen, Prop} +import org.scalacheck.Arbitrary.arbitrary +import org.typelevel.discipline.specs2.mutable.Discipline + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ + +class AsyncSpec extends BaseSpec with Discipline { + + // we just need this because of the laws testing, since the prop runs can interfere with each other + sequential + + { + implicit val ticker = Ticker() + + checkAll( + "AsyncIO", + AsyncTests[AsyncIO].async[Int, Int, Int](10.millis) + ) /*(Parameters(seed = Some(Seed.fromBase64("ZxDXpm7_3Pdkl-Fvt8M90Cxfam9wKuzcifQ1QsIJxND=").get)))*/ + } + + final class AsyncIO[A](val io: IO[A]) + + implicit + def asyncForAsyncIO: Async[AsyncIO] = new Async[AsyncIO] with StackSafeMonad[AsyncIO] { + def pure[A](x: A): AsyncIO[A] = liftIO(IO.pure(x)) + def raiseError[A](e: Throwable): AsyncIO[A] = liftIO(IO.raiseError(e)) + def suspend[A](hint: Sync.Type)(thunk: => A): AsyncIO[A] = liftIO(IO.suspend(hint)(thunk)) + + def canceled: AsyncIO[Unit] = liftIO(IO.canceled) + def cede: AsyncIO[Unit] = liftIO(IO.cede) + def executionContext: AsyncIO[ExecutionContext] = liftIO(IO.executionContext) + + def monotonic: AsyncIO[FiniteDuration] = liftIO(IO.monotonic) + def realTime: AsyncIO[FiniteDuration] = liftIO(IO.realTime) + + def ref[A](a: A): AsyncIO[Ref[AsyncIO, A]] = delay(Ref.unsafe(a)(this)) + def deferred[A]: AsyncIO[Deferred[AsyncIO, A]] = delay(Deferred.unsafe(this)) + + def evalOn[A](fa: AsyncIO[A], ec: ExecutionContext): AsyncIO[A] = wrapIO(fa)(_.evalOn(ec)) + + def flatMap[A, B](fa: AsyncIO[A])(f: A => AsyncIO[B]): AsyncIO[B] = + wrapIO(fa)(_.flatMap(f(_).io)) + + def forceR[A, B](fa: AsyncIO[A])(fb: AsyncIO[B]): AsyncIO[B] = wrapIO(fa)(_.forceR(fb.io)) + + def handleErrorWith[A](fa: AsyncIO[A])(f: Throwable => AsyncIO[A]): AsyncIO[A] = + wrapIO(fa)(_.handleErrorWith(f(_).io)) + + def onCancel[A](fa: AsyncIO[A], fin: AsyncIO[Unit]): AsyncIO[A] = + wrapIO(fa)(_.onCancel(fin.io)) + + def sleep(time: FiniteDuration): AsyncIO[Unit] = liftIO(IO.sleep(time)) + + def cont[K, R](body: Cont[AsyncIO, K, R]): AsyncIO[R] = { + val lower: FunctionK[AsyncIO, IO] = new FunctionK[AsyncIO, IO] { + def apply[A](fa: AsyncIO[A]): IO[A] = fa.io + } + + val ioCont = new Cont[IO, K, R] { + def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) => + body.apply[G].apply(resume, get, lower.andThen(lift)) + } + } + + liftIO(IO.cont(ioCont)) + } + + def start[A](fa: AsyncIO[A]): AsyncIO[Fiber[AsyncIO, Throwable, A]] = + liftIO(fa.io.start.map(liftFiber)) + + def uncancelable[A](body: Poll[AsyncIO] => AsyncIO[A]): AsyncIO[A] = + liftIO { + IO.uncancelable { nat => + val natT = new Poll[AsyncIO] { + def apply[B](aio: AsyncIO[B]): AsyncIO[B] = wrapIO(aio)(nat(_)) + } + body(natT).io + } + } + + private val liftIO: FunctionK[IO, AsyncIO] = new FunctionK[IO, AsyncIO] { + def apply[A](fa: IO[A]): AsyncIO[A] = new AsyncIO(fa) + } + + private def liftOutcome[A](oc: Outcome[IO, Throwable, A]): Outcome[AsyncIO, Throwable, A] = + oc match { + case Outcome.Canceled() => Outcome.Canceled() + case Outcome.Errored(e) => Outcome.Errored(e) + case Outcome.Succeeded(foa) => Outcome.Succeeded(liftIO(foa)) + } + + private def liftFiber[A](fib: FiberIO[A]): Fiber[AsyncIO, Throwable, A] = + new Fiber[AsyncIO, Throwable, A] { + def cancel: AsyncIO[Unit] = liftIO(fib.cancel) + def join: AsyncIO[Outcome[AsyncIO, Throwable, A]] = liftIO(fib.join.map(liftOutcome)) + } + + private def wrapIO[A, B](aio: AsyncIO[A])(f: IO[A] => IO[B]): AsyncIO[B] = + liftIO(f(aio.io)) + + } + + implicit def arbitraryAsyncIO[A](implicit arbIO: Arbitrary[IO[A]]): Arbitrary[AsyncIO[A]] = + Arbitrary(arbitrary[IO[A]].map(new AsyncIO(_))) + + implicit def cogenOutcomeAsyncIO[A]( + implicit + cogenOutcomeIO: Cogen[Outcome[IO, Throwable, A]]): Cogen[Outcome[AsyncIO, Throwable, A]] = + cogenOutcomeIO.contramap { + case Outcome.Canceled() => Outcome.Canceled() + case Outcome.Errored(e) => Outcome.Errored(e) + case Outcome.Succeeded(aio) => Outcome.Succeeded(aio.io) + } + + implicit def eqAsyncIO[A](implicit eqIO: Eq[IO[A]]): Eq[AsyncIO[A]] = (x, y) => + eqIO.eqv(x.io, y.io) + + implicit def orderAsyncIO[A](implicit orderIO: Order[IO[A]]): Order[AsyncIO[A]] = (x, y) => + orderIO.compare(x.io, y.io) + + implicit def execAsyncIO[A](implicit execIO: IO[A] => Prop): AsyncIO[A] => Prop = aio => + execIO(aio.io) + +} From 2e5a38ca7b013abf286063a94162ecb32e8b1cb4 Mon Sep 17 00:00:00 2001 From: Alexey Shuksto Date: Wed, 13 Jul 2022 18:32:53 +0300 Subject: [PATCH 4/6] Fix scalafix --- tests/shared/src/test/scala/cats/effect/kernel/AsyncSpec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/shared/src/test/scala/cats/effect/kernel/AsyncSpec.scala b/tests/shared/src/test/scala/cats/effect/kernel/AsyncSpec.scala index 968b23d043..a2225e54de 100644 --- a/tests/shared/src/test/scala/cats/effect/kernel/AsyncSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/kernel/AsyncSpec.scala @@ -21,6 +21,7 @@ import cats.{Eq, Order, StackSafeMonad} import cats.arrow.FunctionK import cats.effect.laws.AsyncTests import cats.laws.discipline.arbitrary._ + import org.scalacheck.{Arbitrary, Cogen, Prop} import org.scalacheck.Arbitrary.arbitrary import org.typelevel.discipline.specs2.mutable.Discipline From e4e05aa09b5d65769673044377c64fd78f5902f6 Mon Sep 17 00:00:00 2001 From: Alexey Shuksto Date: Wed, 9 Nov 2022 18:48:27 +0300 Subject: [PATCH 5/6] Rename 'asyncPoll => asyncCheckAttempt', sync 'Async/IO' scaladoc --- .../src/main/scala/cats/effect/IO.scala | 30 ++++--- .../main/scala/cats/effect/kernel/Async.scala | 82 +++++++++++++------ .../scala/cats/effect/laws/AsyncLaws.scala | 12 +-- .../scala/cats/effect/laws/AsyncTests.scala | 10 +-- .../src/test/scala/cats/effect/IOSpec.scala | 58 ++++++------- 5 files changed, 118 insertions(+), 74 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index d1ae1a06ef..e9547fd008 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -1075,7 +1075,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { /** * Suspends an asynchronous side effect with optional immediate result in `IO`. * - * The given function will be invoked during evaluation of the `IO` to: + * The given function `k` will be invoked during evaluation of the `IO` to: * - check if result is already available; * - "schedule" the asynchronous callback, where the callback of type `Either[Throwable, A] * \=> Unit` is the parameter passed to that function. Only the ''first'' invocation of @@ -1088,14 +1088,14 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { * - right side `A` is an immediate result of computation (callback invocation will be * dropped); * - left side `Option[IO[Unit]] `is an optional finalizer to be run in the event that the - * fiber running `async(k)` is canceled. + * fiber running `asyncCheckAttempt(k)` is canceled. * * For example, here is a simplified version of `IO.fromCompletableFuture`: * * {{{ * def fromCompletableFuture[A](fut: IO[CompletableFuture[A]]): IO[A] = { * fut.flatMap { cf => - * IO.asyncPoll { cb => + * IO.asyncCheckAttempt { cb => * if (cf.isDone) { * //Register immediately available result of the completable future or handle an error * IO(cf.get) @@ -1121,10 +1121,12 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { * } * }}} * + * Note that `asyncCheckAttempt` is uncancelable during its registration. + * * @see * [[async]] for a simplified variant without an option for immediate result */ - def asyncPoll[A]( + def asyncCheckAttempt[A]( k: (Either[Throwable, A] => Unit) => IO[Either[Option[IO[Unit]], A]]): IO[A] = { val body = new Cont[IO, A, A] { def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) => @@ -1144,7 +1146,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { /** * Suspends an asynchronous side effect in `IO`. * - * The given function will be invoked during evaluation of the `IO` to "schedule" the + * The given function `k` will be invoked during evaluation of the `IO` to "schedule" the * asynchronous callback, where the callback of type `Either[Throwable, A] => Unit` is the * parameter passed to that function. Only the ''first'' invocation of the callback will be * effective! All subsequent invocations will be silently dropped. @@ -1176,8 +1178,13 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { * } * }}} * + * Note that `async` is uncancelable during its registration. + * * @see * [[async_]] for a simplified variant without a finalizer + * @see + * [[asyncCheckAttempt]] for more generic version providing an optional immediate result + * of computation */ def async[A](k: (Either[Throwable, A] => Unit) => IO[Option[IO[Unit]]]): IO[A] = { val body = new Cont[IO, A, A] { @@ -1197,7 +1204,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { /** * Suspends an asynchronous side effect in `IO`. * - * The given function will be invoked during evaluation of the `IO` to "schedule" the + * The given function `k` will be invoked during evaluation of the `IO` to "schedule" the * asynchronous callback, where the callback is the parameter passed to that function. Only * the ''first'' invocation of the callback will be effective! All subsequent invocations will * be silently dropped. @@ -1224,10 +1231,13 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { * This function can be thought of as a safer, lexically-constrained version of `Promise`, * where `IO` is like a safer, lazy version of `Future`. * + * Also, note that `async` is uncancelable during its registration. + * * @see - * [[async]] + * [[async]] for more generic version providing a finalizer * @see - * [[asyncPoll]] + * [[asyncCheckAttempt]] for more generic version providing an optional immediate result + * of computation and a finalizer */ def async_[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = { val body = new Cont[IO, A, A] { @@ -1681,9 +1691,9 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { private[this] val _asyncForIO: kernel.Async[IO] = new kernel.Async[IO] with StackSafeMonad[IO] { - override def asyncPoll[A]( + override def asyncCheckAttempt[A]( k: (Either[Throwable, A] => Unit) => IO[Either[Option[IO[Unit]], A]]): IO[A] = - IO.asyncPoll(k) + IO.asyncCheckAttempt(k) override def async[A](k: (Either[Throwable, A] => Unit) => IO[Option[IO[Unit]]]): IO[A] = IO.async(k) diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala b/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala index 0a9f3f5fa1..fab8e244aa 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala @@ -62,21 +62,32 @@ import java.util.concurrent.atomic.AtomicReference trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] { /** - * The asynchronous FFI with option for immediate result. + * Suspends an asynchronous side effect with optional immediate result in `F`. * - * `k` takes a callback of type `Either[Throwable, A] => Unit` to signal the result of the - * asynchronous computation. + * The given function `k` will be invoked during evaluation of `F` to: + * - check if result is already available; + * - "schedule" the asynchronous callback, where the callback of type `Either[Throwable, A] + * \=> Unit` is the parameter passed to that function. Only the ''first'' invocation of + * the callback will be effective! All subsequent invocations will be silently dropped. * - * `k` returns an `Either[Option[F[Unit]], A]` where: - * - right side `A` signals immediately available result; - * - left side `Option[F[Unit]]` is an optional finalizer to be run in the event that the - * fiber running `async(k)` is canceled. + * The process of registering the callback itself is suspended in `F` (the outer `F` of + * `F[Either[Option[G[Unit]], A]]`). * - * In case where `k` returns left side the execution of `async(k)` is semantically blocked - * until the callback is invoked. In case of a right side callback invocation is silently - * dropped. + * The effect returns `Either[Option[F[Unit]], A]` where: + * - right side `A` is an immediate result of computation (callback invocation will be + * dropped); + * - left side `Option[F[Unit]] `is an optional finalizer to be run in the event that the + * fiber running `asyncCheckAttempt(k)` is canceled. + * + * Also, note that `asyncCheckAttempt` is uncancelable during its registration. + * + * @see + * [[async]] for a simplified variant without an option for immediate result + * @see + * [[async_]] for a simplified variant without an option for immediate result or finalizer */ - def asyncPoll[A](k: (Either[Throwable, A] => Unit) => F[Either[Option[F[Unit]], A]]): F[A] = { + def asyncCheckAttempt[A]( + k: (Either[Throwable, A] => Unit) => F[Either[Option[F[Unit]], A]]): F[A] = { val body = new Cont[F, A, A] { def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) => G.uncancelable { poll => @@ -93,25 +104,48 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] { } /** - * The asynchronous FFI. + * Suspends an asynchronous side effect in `F`. + * + * The given function `k` will be invoked during evaluation of the `F` to "schedule" the + * asynchronous callback, where the callback of type `Either[Throwable, A] => Unit` is the + * parameter passed to that function. Only the ''first'' invocation of the callback will be + * effective! All subsequent invocations will be silently dropped. * - * `k` takes a callback of type `Either[Throwable, A] => Unit` to signal the result of the - * asynchronous computation. The execution of `async(k)` is semantically blocked until the - * callback is invoked. + * The process of registering the callback itself is suspended in `F` (the outer `F` of + * `F[Option[F[Unit]]]`). * - * `k` returns an `Option[F[Unit]]` which is an optional finalizer to be run in the event that - * the fiber running `async(k)` is canceled. If passed `k` is `None`, then created effect will - * be uncancelable. + * The effect returns `Option[F[Unit]]` which is an optional finalizer to be run in the event + * that the fiber running `async(k)` is canceled. * * Also, note that `async` is uncancelable during its registration. + * + * @see + * [[async_]] for a simplified variant without a finalizer + * @see + * [[asyncCheckAttempt]] for more generic version with option of providing immediate result + * of computation */ def async[A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]]): F[A] = - asyncPoll[A](cb => map(k(cb))(Left(_))) + asyncCheckAttempt[A](cb => map(k(cb))(Left(_))) /** - * A convenience version of [[Async.async]] for when we don't need to perform `F[_]` effects - * or perform finalization in the event of cancelation (i.e. created effect will be - * uncancelable). + * Suspends an asynchronous side effect in `F`. + * + * The given function `k` will be invoked during evaluation of the `F` to "schedule" the + * asynchronous callback, where the callback is the parameter passed to that function. Only + * the ''first'' invocation of the callback will be effective! All subsequent invocations will + * be silently dropped. + * + * This function can be thought of as a safer, lexically-constrained version of `Promise`, + * where `IO` is like a safer, lazy version of `Future`. + * + * Also, note that `async` is uncancelable during its registration. + * + * @see + * [[async]] for more generic version providing a finalizer + * @see + * [[asyncCheckAttempt]] for more generic version with option of providing immediate result + * of computation and finalizer */ def async_[A](k: (Either[Throwable, A] => Unit) => Unit): F[A] = async[A](cb => as(delay(k(cb)), None)) @@ -202,9 +236,9 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] { * NOTE: This is a very low level api, end users should use `async` instead. * See cats.effect.kernel.Cont for more detail. * - * If you are an implementor, and you have `async` or `asyncPoll`, + * If you are an implementor, and you have `async` or `asyncCheckAttempt`, * `Async.defaultCont` provides an implementation of `cont` in terms of `async`. - * Note that if you use `defaultCont` you _have_ to override `async/asyncPoll`. + * Note that if you use `defaultCont` you _have_ to override `async/asyncCheckAttempt`. */ def cont[K, R](body: Cont[F, K, R]): F[R] } diff --git a/laws/shared/src/main/scala/cats/effect/laws/AsyncLaws.scala b/laws/shared/src/main/scala/cats/effect/laws/AsyncLaws.scala index e4586ecb25..cfddeba102 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/AsyncLaws.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/AsyncLaws.scala @@ -27,18 +27,18 @@ trait AsyncLaws[F[_]] extends GenTemporalLaws[F, Throwable] with SyncLaws[F] { implicit val F: Async[F] // format: off - def asyncPollImmediateIsPure[A](a: A) = - (F.asyncPoll[A](_ => F.pure(Right(a))) <* F.unit) <-> (F.pure(a)) + def asyncCheckAttemptImmediateIsPure[A](a: A) = + (F.asyncCheckAttempt[A](_ => F.pure(Right(a))) <* F.unit) <-> (F.pure(a)) // format: on // format: off - def asyncPollSuspendedRightIsAsyncRight[A](a: A, fu: F[Unit]) = - (F.asyncPoll[A](k => F.delay(k(Right(a))) >> fu.as(Left(None))) <* F.unit) <-> (F.async[A](k => F.delay(k(Right(a))) >> fu.as(None)) <* F.unit) + def asyncCheckAttemptSuspendedRightIsAsyncRight[A](a: A, fu: F[Unit]) = + (F.asyncCheckAttempt[A](k => F.delay(k(Right(a))) >> fu.as(Left(None))) <* F.unit) <-> (F.async[A](k => F.delay(k(Right(a))) >> fu.as(None)) <* F.unit) // format: on // format: off - def asyncPollSuspendedLeftIsAsyncLeft[A](e: Throwable, fu: F[Unit]) = - (F.asyncPoll[A](k => F.delay(k(Left(e))) >> fu.as(Left(None))) <* F.unit) <-> (F.async[A](k => F.delay(k(Left(e))) >> fu.as(None)) <* F.unit) + def asyncCheckAttemptSuspendedLeftIsAsyncLeft[A](e: Throwable, fu: F[Unit]) = + (F.asyncCheckAttempt[A](k => F.delay(k(Left(e))) >> fu.as(Left(None))) <* F.unit) <-> (F.async[A](k => F.delay(k(Left(e))) >> fu.as(None)) <* F.unit) // format: on // format: off diff --git a/laws/shared/src/main/scala/cats/effect/laws/AsyncTests.scala b/laws/shared/src/main/scala/cats/effect/laws/AsyncTests.scala index 07d3538c5f..de4e4b7be1 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/AsyncTests.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/AsyncTests.scala @@ -239,11 +239,11 @@ trait AsyncTests[F[_]] extends GenTemporalTests[F, Throwable] with SyncTests[F] ) val props = Seq( - "asyncPoll immediate is pure" -> forAll(laws.asyncPollImmediateIsPure[A] _), - "asyncPoll suspended right is async right" -> forAll( - laws.asyncPollSuspendedRightIsAsyncRight[A] _), - "asyncPoll suspended left is async left" -> forAll( - laws.asyncPollSuspendedLeftIsAsyncLeft[A] _), + "asyncCheckAttempt immediate is pure" -> forAll(laws.asyncCheckAttemptImmediateIsPure[A] _), + "asyncCheckAttempt suspended right is async right" -> forAll( + laws.asyncCheckAttemptSuspendedRightIsAsyncRight[A] _), + "asyncCheckAttempt suspended left is async left" -> forAll( + laws.asyncCheckAttemptSuspendedLeftIsAsyncLeft[A] _), "async right is uncancelable sequenced pure" -> forAll( laws.asyncRightIsUncancelableSequencedPure[A] _), "async left is uncancelable sequenced raiseError" -> forAll( diff --git a/tests/shared/src/test/scala/cats/effect/IOSpec.scala b/tests/shared/src/test/scala/cats/effect/IOSpec.scala index 4d300adef5..40adf6e7f9 100644 --- a/tests/shared/src/test/scala/cats/effect/IOSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/IOSpec.scala @@ -56,14 +56,14 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { i mustEqual 42 } - "preserve monad identity on asyncPoll immediate result" in ticked { implicit ticker => - val fa = IO.asyncPoll[Int](_ => IO(Right(42))) + "preserve monad identity on asyncCheckAttempt immediate result" in ticked { implicit ticker => + val fa = IO.asyncCheckAttempt[Int](_ => IO(Right(42))) fa.flatMap(i => IO.pure(i)) must completeAs(42) fa must completeAs(42) } - "preserve monad identity on asyncPoll suspended result" in ticked { implicit ticker => - val fa = IO.asyncPoll[Int](cb => IO(cb(Right(42))).as(Left(None))) + "preserve monad identity on asyncCheckAttempt suspended result" in ticked { implicit ticker => + val fa = IO.asyncCheckAttempt[Int](cb => IO(cb(Right(42))).as(Left(None))) fa.flatMap(i => IO.pure(i)) must completeAs(42) fa must completeAs(42) } @@ -81,9 +81,9 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { IO(throw TestException) must failAs(TestException) } - "resume error continuation within asyncPoll" in ticked { implicit ticker => + "resume error continuation within asyncCheckAttempt" in ticked { implicit ticker => case object TestException extends RuntimeException - IO.asyncPoll[Unit](k => IO(k(Left(TestException))).as(Left(None))) must failAs( + IO.asyncCheckAttempt[Unit](k => IO(k(Left(TestException))).as(Left(None))) must failAs( TestException) } @@ -429,24 +429,24 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { } - "asyncPoll" should { + "asyncCheckAttempt" should { - "resume value continuation within asyncPoll with immediate result" in ticked { - implicit ticker => IO.asyncPoll[Int](_ => IO(Right(42))) must completeAs(42) + "resume value continuation within asyncCheckAttempt with immediate result" in ticked { + implicit ticker => IO.asyncCheckAttempt[Int](_ => IO(Right(42))) must completeAs(42) } - "resume value continuation within asyncPoll with suspended result" in ticked { + "resume value continuation within asyncCheckAttempt with suspended result" in ticked { implicit ticker => - IO.asyncPoll[Int](k => IO(k(Right(42))).as(Left(None))) must completeAs(42) + IO.asyncCheckAttempt[Int](k => IO(k(Right(42))).as(Left(None))) must completeAs(42) } - "continue from the results of an asyncPoll immediate result produced prior to registration" in ticked { - implicit ticker => IO.asyncPoll[Int](_ => IO(Right(42))).map(_ + 2) must completeAs(44) + "continue from the results of an asyncCheckAttempt immediate result produced prior to registration" in ticked { + implicit ticker => IO.asyncCheckAttempt[Int](_ => IO(Right(42))).map(_ + 2) must completeAs(44) } - "continue from the results of an asyncPoll suspended result produced prior to registration" in ticked { + "continue from the results of an asyncCheckAttempt suspended result produced prior to registration" in ticked { implicit ticker => - IO.asyncPoll[Int](cb => IO(cb(Right(42))).as(Left(None))).map(_ + 2) must completeAs( + IO.asyncCheckAttempt[Int](cb => IO(cb(Right(42))).as(Left(None))).map(_ + 2) must completeAs( 44) } @@ -454,7 +454,7 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { "produce a failure when the registration raises an error after result" in ticked { implicit ticker => case object TestException extends RuntimeException - IO.asyncPoll[Int](_ => IO(Right(42)) + IO.asyncCheckAttempt[Int](_ => IO(Right(42)) .flatMap(_ => IO.raiseError(TestException))) .void must failAs(TestException) } @@ -464,21 +464,21 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { "produce a failure when the registration raises an error after callback" in ticked { implicit ticker => case object TestException extends RuntimeException - IO.asyncPoll[Int](cb => IO(cb(Right(42))) + IO.asyncCheckAttempt[Int](cb => IO(cb(Right(42))) .flatMap(_ => IO.raiseError(TestException))) .void must failAs(TestException) } // format: on - "ignore asyncPoll callback" in ticked { implicit ticker => + "ignore asyncCheckAttempt callback" in ticked { implicit ticker => case object TestException extends RuntimeException var cb: Either[Throwable, Int] => Unit = null - val asyncPoll = IO.asyncPoll[Int] { cb0 => IO { cb = cb0 } *> IO.pure(Right(42)) } + val asyncCheckAttempt = IO.asyncCheckAttempt[Int] { cb0 => IO { cb = cb0 } *> IO.pure(Right(42)) } val test = for { - fiber <- asyncPoll.start + fiber <- asyncCheckAttempt.start _ <- IO(ticker.ctx.tick()) _ <- IO(cb(Right(43))) _ <- IO(ticker.ctx.tick()) @@ -490,7 +490,7 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { test must completeAs(42) } - "ignore asyncPoll callback real" in real { + "ignore asyncCheckAttempt callback real" in real { case object TestException extends RuntimeException var cb: Either[Throwable, Int] => Unit = null @@ -499,7 +499,7 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { latch1 <- Deferred[IO, Unit] latch2 <- Deferred[IO, Unit] fiber <- - IO.asyncPoll[Int] { cb0 => + IO.asyncCheckAttempt[Int] { cb0 => IO { cb = cb0 } *> latch1.complete(()) *> latch2.get *> IO.pure(Right(42)) }.start _ <- latch1.get @@ -512,15 +512,15 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { test.attempt.flatMap { n => IO(n mustEqual Right(42)) } } - "repeated asyncPoll callback" in ticked { implicit ticker => + "repeated asyncCheckAttempt callback" in ticked { implicit ticker => case object TestException extends RuntimeException var cb: Either[Throwable, Int] => Unit = null - val asyncPoll = IO.asyncPoll[Int] { cb0 => IO { cb = cb0 } *> IO.pure(Left(None)) } + val asyncCheckAttempt = IO.asyncCheckAttempt[Int] { cb0 => IO { cb = cb0 } *> IO.pure(Left(None)) } val test = for { - fiber <- asyncPoll.start + fiber <- asyncCheckAttempt.start _ <- IO(ticker.ctx.tick()) _ <- IO(cb(Right(42))) _ <- IO(ticker.ctx.tick()) @@ -534,7 +534,7 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { test must completeAs(42) } - "repeated asyncPoll callback real" in real { + "repeated asyncCheckAttempt callback real" in real { case object TestException extends RuntimeException var cb: Either[Throwable, Int] => Unit = null @@ -543,7 +543,7 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { latch1 <- Deferred[IO, Unit] latch2 <- Deferred[IO, Unit] fiber <- - IO.asyncPoll[Int] { cb0 => + IO.asyncCheckAttempt[Int] { cb0 => IO { cb = cb0 } *> latch1.complete(()) *> latch2.get *> IO.pure(Left(None)) }.start _ <- latch1.get @@ -561,8 +561,8 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { var outerR = 0 var innerR = 0 - val outer = IO.asyncPoll[Int] { cb1 => - val inner = IO.asyncPoll[Int] { cb2 => + val outer = IO.asyncCheckAttempt[Int] { cb1 => + val inner = IO.asyncCheckAttempt[Int] { cb2 => IO(cb1(Right(1))) *> IO.executionContext .flatMap(ec => IO(ec.execute(() => cb2(Right(2))))) From 719c106e3aa81af6af68cb38e5a8809c37dd7c04 Mon Sep 17 00:00:00 2001 From: Alexey Shuksto Date: Mon, 14 Nov 2022 13:16:09 +0300 Subject: [PATCH 6/6] Run scalafmtAll --- .../src/main/scala/cats/effect/IO.scala | 8 ++-- .../scala/cats/effect/laws/AsyncTests.scala | 3 +- .../src/test/scala/cats/effect/IOSpec.scala | 39 ++++++++++++------- 3 files changed, 30 insertions(+), 20 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index e9547fd008..6b81ccc365 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -1183,8 +1183,8 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { * @see * [[async_]] for a simplified variant without a finalizer * @see - * [[asyncCheckAttempt]] for more generic version providing an optional immediate result - * of computation + * [[asyncCheckAttempt]] for more generic version providing an optional immediate result of + * computation */ def async[A](k: (Either[Throwable, A] => Unit) => IO[Option[IO[Unit]]]): IO[A] = { val body = new Cont[IO, A, A] { @@ -1236,8 +1236,8 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { * @see * [[async]] for more generic version providing a finalizer * @see - * [[asyncCheckAttempt]] for more generic version providing an optional immediate result - * of computation and a finalizer + * [[asyncCheckAttempt]] for more generic version providing an optional immediate result of + * computation and a finalizer */ def async_[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = { val body = new Cont[IO, A, A] { diff --git a/laws/shared/src/main/scala/cats/effect/laws/AsyncTests.scala b/laws/shared/src/main/scala/cats/effect/laws/AsyncTests.scala index de4e4b7be1..916846ded1 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/AsyncTests.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/AsyncTests.scala @@ -239,7 +239,8 @@ trait AsyncTests[F[_]] extends GenTemporalTests[F, Throwable] with SyncTests[F] ) val props = Seq( - "asyncCheckAttempt immediate is pure" -> forAll(laws.asyncCheckAttemptImmediateIsPure[A] _), + "asyncCheckAttempt immediate is pure" -> forAll( + laws.asyncCheckAttemptImmediateIsPure[A] _), "asyncCheckAttempt suspended right is async right" -> forAll( laws.asyncCheckAttemptSuspendedRightIsAsyncRight[A] _), "asyncCheckAttempt suspended left is async left" -> forAll( diff --git a/tests/shared/src/test/scala/cats/effect/IOSpec.scala b/tests/shared/src/test/scala/cats/effect/IOSpec.scala index 40adf6e7f9..d4395f6b20 100644 --- a/tests/shared/src/test/scala/cats/effect/IOSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/IOSpec.scala @@ -56,16 +56,18 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { i mustEqual 42 } - "preserve monad identity on asyncCheckAttempt immediate result" in ticked { implicit ticker => - val fa = IO.asyncCheckAttempt[Int](_ => IO(Right(42))) - fa.flatMap(i => IO.pure(i)) must completeAs(42) - fa must completeAs(42) + "preserve monad identity on asyncCheckAttempt immediate result" in ticked { + implicit ticker => + val fa = IO.asyncCheckAttempt[Int](_ => IO(Right(42))) + fa.flatMap(i => IO.pure(i)) must completeAs(42) + fa must completeAs(42) } - "preserve monad identity on asyncCheckAttempt suspended result" in ticked { implicit ticker => - val fa = IO.asyncCheckAttempt[Int](cb => IO(cb(Right(42))).as(Left(None))) - fa.flatMap(i => IO.pure(i)) must completeAs(42) - fa must completeAs(42) + "preserve monad identity on asyncCheckAttempt suspended result" in ticked { + implicit ticker => + val fa = IO.asyncCheckAttempt[Int](cb => IO(cb(Right(42))).as(Left(None))) + fa.flatMap(i => IO.pure(i)) must completeAs(42) + fa must completeAs(42) } "preserve monad identity on async" in ticked { implicit ticker => @@ -441,13 +443,15 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { } "continue from the results of an asyncCheckAttempt immediate result produced prior to registration" in ticked { - implicit ticker => IO.asyncCheckAttempt[Int](_ => IO(Right(42))).map(_ + 2) must completeAs(44) + implicit ticker => + val fa = IO.asyncCheckAttempt[Int](_ => IO(Right(42))).map(_ + 2) + fa must completeAs(44) } "continue from the results of an asyncCheckAttempt suspended result produced prior to registration" in ticked { implicit ticker => - IO.asyncCheckAttempt[Int](cb => IO(cb(Right(42))).as(Left(None))).map(_ + 2) must completeAs( - 44) + val fa = IO.asyncCheckAttempt[Int](cb => IO(cb(Right(42))).as(Left(None))).map(_ + 2) + fa must completeAs(44) } // format: off @@ -464,9 +468,10 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { "produce a failure when the registration raises an error after callback" in ticked { implicit ticker => case object TestException extends RuntimeException - IO.asyncCheckAttempt[Int](cb => IO(cb(Right(42))) + val fa = IO.asyncCheckAttempt[Int](cb => IO(cb(Right(42))) .flatMap(_ => IO.raiseError(TestException))) - .void must failAs(TestException) + .void + fa must failAs(TestException) } // format: on @@ -475,7 +480,9 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { var cb: Either[Throwable, Int] => Unit = null - val asyncCheckAttempt = IO.asyncCheckAttempt[Int] { cb0 => IO { cb = cb0 } *> IO.pure(Right(42)) } + val asyncCheckAttempt = IO.asyncCheckAttempt[Int] { cb0 => + IO { cb = cb0 } *> IO.pure(Right(42)) + } val test = for { fiber <- asyncCheckAttempt.start @@ -517,7 +524,9 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { var cb: Either[Throwable, Int] => Unit = null - val asyncCheckAttempt = IO.asyncCheckAttempt[Int] { cb0 => IO { cb = cb0 } *> IO.pure(Left(None)) } + val asyncCheckAttempt = IO.asyncCheckAttempt[Int] { cb0 => + IO { cb = cb0 } *> IO.pure(Left(None)) + } val test = for { fiber <- asyncCheckAttempt.start