diff --git a/build.sbt b/build.sbt index 8da993799f..2a91199d62 100644 --- a/build.sbt +++ b/build.sbt @@ -222,13 +222,12 @@ lazy val kernel = crossProject(JSPlatform, JVMPlatform) libraryDependencies += "org.specs2" %%% "specs2-core" % Specs2Version % Test) .settings(dottyLibrarySettings) .settings(libraryDependencies += "org.typelevel" %%% "cats-core" % CatsVersion) - .jsSettings( - Compile / doc / sources := { - if (scalaVersion.value == "3.0.0-RC2") - Seq() - else - (Compile / doc / sources).value - }) + .jsSettings(Compile / doc / sources := { + if (scalaVersion.value == "3.0.0-RC2") + Seq() + else + (Compile / doc / sources).value + }) /** * Reference implementations (including a pure ConcurrentBracket), generic ScalaCheck @@ -329,7 +328,14 @@ lazy val std = crossProject(JSPlatform, JVMPlatform) else "org.specs2" %%% "specs2-scalacheck" % Specs2Version % Test }, - libraryDependencies += "org.scalacheck" %%% "scalacheck" % ScalaCheckVersion % Test + libraryDependencies += "org.scalacheck" %%% "scalacheck" % ScalaCheckVersion % Test, + mimaBinaryIssueFilters ++= Seq( + // introduced by #1889, removal of private classes + ProblemFilters.exclude[MissingClassProblem]("cats.effect.std.Queue$AbstractQueue"), + ProblemFilters.exclude[MissingClassProblem]("cats.effect.std.Queue$BoundedQueue"), + ProblemFilters.exclude[MissingClassProblem]("cats.effect.std.Queue$DroppingQueue"), + ProblemFilters.exclude[MissingClassProblem]("cats.effect.std.Queue$CircularBufferQueue") + ) ) /** diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/Deferred.scala b/kernel/shared/src/main/scala/cats/effect/kernel/Deferred.scala index 344f83272f..48309acb66 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/Deferred.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/Deferred.scala @@ -67,6 +67,13 @@ object Deferred { def apply[F[_], A](implicit F: GenConcurrent[F, _]): F[Deferred[F, A]] = F.deferred[A] + /** + * Like `apply` but returns `AsyncDeferred`, which provides methods + * to change its effect type without use of a natural transformation. + */ + def async[F[_], A](implicit F: Async[F]): F[AsyncDeferred[F, A]] = + F.delay(new AsyncDeferred[F, A]) + /** * Like `apply` but returns the newly allocated Deferred directly * instead of wrapping it in `F.delay`. This method is considered @@ -91,11 +98,11 @@ object Deferred { val dummyId = 0L } - final class AsyncDeferred[F[_], A](implicit F: Async[F]) extends Deferred[F, A] { - // shared mutable state - private[this] val ref = new AtomicReference[State[A]]( - State.Unset(LongMap.empty, State.initialId) - ) + private[kernel] sealed trait AsyncDeferredSource[F[_], A] + extends DeferredSource[F, A] + with AsyncDeferredLike[A] { + protected implicit def F: Async[F] + private[Deferred] val ref: AtomicReference[State[A]] def get: F[A] = { // side-effectful @@ -157,6 +164,12 @@ object Deferred { case State.Unset(_, _) => None } } + } + + sealed trait SyncDeferredSink[F[_], A] extends DeferredSink[F, A] { self => + protected implicit def F: Sync[F] + + private[Deferred] val ref: AtomicReference[State[A]] def complete(a: A): F[Boolean] = { def notifyReaders(readers: LongMap[A => Unit]): F[Unit] = { @@ -192,6 +205,45 @@ object Deferred { F.defer(loop()) } + + /* + * Returns a new view of this `SyncDeferredSink` instance that shares + * the same state but which suspends operations in `G` rather than `F`. + * + * Similar to `mapK`, but requires a [[Sync]] instance instead of a natural + * transformation. + */ + def transformSync[G[_]](implicit G: Sync[G]): SyncDeferredSink[G, A] = + new SyncDeferredSink[G, A] { + override protected val F: Sync[G] = G + override private[Deferred] val ref: AtomicReference[State[A]] = self.ref + } + } + + sealed trait AsyncDeferredLike[A] { + def transformAsync[G[_]](implicit G: Async[G]): AsyncDeferred[G, A] + } + + sealed class AsyncDeferred[F[_], A](implicit protected val F: Async[F]) + extends Deferred[F, A] + with AsyncDeferredSource[F, A] + with SyncDeferredSink[F, A] { self => + // shared mutable state + private[Deferred] val ref = new AtomicReference[State[A]]( + State.Unset(LongMap.empty, State.initialId) + ) + + /* + * Returns a new view of this `AsyncDeferred` instance that shares + * the same state but which suspends operations in `G` rather than `F`. + * + * Similar to `mapK`, but requires an [[Async]] instance instead of + * a natural transformation. + */ + def transformAsync[G[_]](implicit G: Async[G]): AsyncDeferred[G, A] = + new AsyncDeferred[G, A] { + override private[Deferred] val ref: AtomicReference[State[A]] = self.ref + } } implicit def catsInvariantForDeferred[F[_]: Functor]: Invariant[Deferred[F, *]] = diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/Ref.scala b/kernel/shared/src/main/scala/cats/effect/kernel/Ref.scala index 51f628755f..41cc014364 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/Ref.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/Ref.scala @@ -177,6 +177,15 @@ object Ref { */ def of[F[_], A](a: A)(implicit mk: Make[F]): F[Ref[F, A]] = mk.refOf(a) + /** + * Like [[of]], but returns `SyncRef` which provides the [[SyncRef#transformSync]] + * method to change its effect type without use of a natural transformation. + */ + def sync[F[_], A](a: A)(implicit F: Sync[F]): F[SyncRef[F, A]] = syncIn[F, F, A](a) + + def syncIn[F[_], G[_], A](a: A)(implicit F: Sync[F], G: Sync[G]): F[SyncRef[G, A]] = + F.delay(new SyncRef(new AtomicReference(a))) + /** * Creates a Ref starting with the value of the one in `source`. * @@ -269,7 +278,7 @@ object Ref { def of[A](a: A): F[Ref[F, A]] = mk.refOf(a) } - final private class SyncRef[F[_], A](ar: AtomicReference[A])(implicit F: Sync[F]) + final class SyncRef[F[_], A] private[Ref] (ar: AtomicReference[A])(implicit F: Sync[F]) extends Ref[F, A] { def get: F[A] = F.delay(ar.get) @@ -350,6 +359,8 @@ object Ref { val f = state.runF.value modify(a => f(a).value) } + + def transformSync[G[_]: Sync]: SyncRef[G, A] = new SyncRef(ar) } final private[kernel] class TransformedRef[F[_], G[_], A]( diff --git a/std/shared/src/main/scala/cats/effect/std/Queue.scala b/std/shared/src/main/scala/cats/effect/std/Queue.scala index 4cbef8fb49..8cfcea76b0 100644 --- a/std/shared/src/main/scala/cats/effect/std/Queue.scala +++ b/std/shared/src/main/scala/cats/effect/std/Queue.scala @@ -18,7 +18,9 @@ package cats package effect package std -import cats.effect.kernel.{Deferred, GenConcurrent, Poll, Ref} +import cats.effect.kernel.Deferred.AsyncDeferredLike +import cats.effect.kernel.Ref.SyncRef +import cats.effect.kernel.{Async, Deferred, GenConcurrent, MonadCancel, Poll, Ref, Sync} import cats.effect.kernel.syntax.all._ import cats.syntax.all._ @@ -55,6 +57,10 @@ abstract class Queue[F[_], A] extends QueueSource[F, A] with QueueSink[F, A] { s } } +abstract class AsyncQueue[F[_], A] extends Queue[F, A] { + def transformAsync[G[_]: Async]: AsyncQueue[G, A] +} + object Queue { /** @@ -68,7 +74,16 @@ object Queue { */ def bounded[F[_], A](capacity: Int)(implicit F: GenConcurrent[F, _]): F[Queue[F, A]] = { assertNonNegative(capacity) - F.ref(State.empty[F, A]).map(new BoundedQueue(capacity, _)) + F.ref(State.empty[Deferred[F, *], A]) + .map(ref => new GenConcurrentImpl(capacity, ref, Strategy.bounded)) + } + + def boundedIn[F[_], G[_], A]( + capacity: Int)(implicit F: Sync[F], G: Async[G]): F[AsyncQueue[G, A]] = { + assertNonNegative(capacity) + Ref + .syncIn[F, G, State[AsyncDeferredLike, A]](State.empty) + .map(new AsyncImpl(capacity, _, Strategy.bounded)) } /** @@ -83,6 +98,9 @@ object Queue { def synchronous[F[_], A](implicit F: GenConcurrent[F, _]): F[Queue[F, A]] = bounded(0) + def synchronousIn[F[_]: Sync, G[_]: Async, A]: F[AsyncQueue[G, A]] = + boundedIn[F, G, A](0) + /** * Constructs an empty, unbounded queue for `F` data types that are * [[cats.effect.kernel.GenConcurrent]]. [[Queue#offer]] never blocks semantically, as there is @@ -93,6 +111,9 @@ object Queue { def unbounded[F[_], A](implicit F: GenConcurrent[F, _]): F[Queue[F, A]] = bounded(Int.MaxValue) + def unboundedIn[F[_]: Sync, G[_]: Async, A]: F[AsyncQueue[G, A]] = + boundedIn[F, G, A](Int.MaxValue) + /** * Constructs an empty, bounded, dropping queue holding up to `capacity` * elements for `F` data types that are [[cats.effect.kernel.GenConcurrent]]. When the queue is full @@ -106,7 +127,15 @@ object Queue { */ def dropping[F[_], A](capacity: Int)(implicit F: GenConcurrent[F, _]): F[Queue[F, A]] = { assertPositive(capacity, "Dropping") - F.ref(State.empty[F, A]).map(new DroppingQueue(capacity, _)) + F.ref(State.empty[Deferred[F, *], A]) + .map(new GenConcurrentImpl(capacity, _, Strategy.dropping)) + } + + def droppingIn[F[_]: Sync, G[_]: Async, A](capacity: Int): F[AsyncQueue[G, A]] = { + assertPositive(capacity, "Dropping") + Ref + .syncIn[F, G, State[AsyncDeferredLike, A]](State.empty) + .map(new AsyncImpl(capacity, _, Strategy.dropping)) } /** @@ -123,7 +152,15 @@ object Queue { def circularBuffer[F[_], A](capacity: Int)( implicit F: GenConcurrent[F, _]): F[Queue[F, A]] = { assertPositive(capacity, "CircularBuffer") - F.ref(State.empty[F, A]).map(new CircularBufferQueue(capacity, _)) + F.ref(State.empty[Deferred[F, *], A]) + .map(new GenConcurrentImpl(capacity, _, Strategy.circularBuffer)) + } + + def circularBufferIn[F[_]: Sync, G[_]: Async, A](capacity: Int): F[AsyncQueue[G, A]] = { + assertPositive(capacity, "CircularBuffer") + Ref + .syncIn[F, G, State[AsyncDeferredLike, A]](State.empty) + .map(new AsyncImpl(capacity, _, Strategy.circularBuffer)) } private def assertNonNegative(capacity: Int): Unit = @@ -138,34 +175,77 @@ object Queue { s"$name queue capacity must be positive, was: $capacity") else () - private sealed abstract class AbstractQueue[F[_], A]( - capacity: Int, - state: Ref[F, State[F, A]] - )(implicit F: GenConcurrent[F, _]) - extends Queue[F, A] { + private sealed trait DeferredOps[F[_], D[_]] { + def get[A](d: D[A]): F[A] + def complete[A](d: D[A], a: A): F[Unit] + def mkD[A]: F[D[A]] + } - protected def onOfferNoCapacity( - s: State[F, A], - a: A, - offerer: Deferred[F, Unit], - poll: Poll[F] - ): (State[F, A], F[Unit]) + implicit private def opsForAsync[F[_]]( + implicit F: Async[F]): DeferredOps[F, AsyncDeferredLike] = + new DeferredOps[F, AsyncDeferredLike] { + def complete[A](d: AsyncDeferredLike[A], b: A): F[Unit] = + d.transformAsync[F].complete(b).void + + def get[A](db: AsyncDeferredLike[A]): F[A] = db.transformAsync[F].get + + def mkD[A]: F[AsyncDeferredLike[A]] = Deferred.async[F, A].widen + } + + implicit private def opsForGenConcurrent[F[_]]( + implicit F: GenConcurrent[F, _]): DeferredOps[F, Deferred[F, *]] = + new DeferredOps[F, Deferred[F, *]] { + def complete[A](d: Deferred[F, A], b: A): F[Unit] = + d.complete(b).void + + def get[A](db: Deferred[F, A]): F[A] = db.get + + def mkD[A]: F[Deferred[F, A]] = F.deferred + } + + private final class GenConcurrentImpl[F[_], A]( + protected val capacity: Int, + protected val state: Ref[F, State[Deferred[F, *], A]], + protected val strategy: Strategy)( + implicit protected val F: GenConcurrent[F, _], + protected val ops: DeferredOps[F, Deferred[F, *]]) + extends Queue[F, A] + with Impl[F, Deferred[F, *], A] + + private final class AsyncImpl[F[_], A]( + protected val capacity: Int, + protected val state: SyncRef[F, State[AsyncDeferredLike, A]], + protected val strategy: Strategy)( + implicit protected val F: Async[F], + protected val ops: DeferredOps[F, AsyncDeferredLike]) + extends AsyncQueue[F, A] + with Impl[F, AsyncDeferredLike, A] { + def transformAsync[G[_]: Async]: AsyncQueue[G, A] = + new AsyncImpl[G, A](capacity, state.transformSync[G], strategy) + } - protected def onTryOfferNoCapacity(s: State[F, A], a: A): (State[F, A], F[Boolean]) + private sealed trait Impl[F[_], D[_], A] extends Queue[F, A] { + + protected def capacity: Int + protected def state: Ref[F, State[D, A]] + protected def strategy: Strategy + + implicit protected def F: MonadCancel[F, _] + implicit protected def ops: DeferredOps[F, D] def offer(a: A): F[Unit] = - F.deferred[Unit].flatMap { offerer => + ops.mkD[Unit].flatMap { offerer => F.uncancelable { poll => state.modify { case State(queue, size, takers, offerers) if takers.nonEmpty => val (taker, rest) = takers.dequeue - State(queue, size, rest, offerers) -> taker.complete(a).void + State(queue, size, rest, offerers) -> ops.complete(taker, a) case State(queue, size, takers, offerers) if size < capacity => State(queue.enqueue(a), size + 1, takers, offerers) -> F.unit case s => - onOfferNoCapacity(s, a, offerer, poll) + strategy.onOfferNoCapacity[F, D, A](s, a, offerer, poll, state) }.flatten } } @@ -175,19 +255,19 @@ object Queue { .modify { case State(queue, size, takers, offerers) if takers.nonEmpty => val (taker, rest) = takers.dequeue - State(queue, size, rest, offerers) -> taker.complete(a).as(true) + State(queue, size, rest, offerers) -> ops.complete(taker, a).as(true) case State(queue, size, takers, offerers) if size < capacity => State(queue.enqueue(a), size + 1, takers, offerers) -> F.pure(true) case s => - onTryOfferNoCapacity(s, a) + strategy.onTryOfferNoCapacity[F, D, A](s, a) } .flatten .uncancelable val take: F[A] = - F.deferred[A].flatMap { taker => + ops.mkD[A].flatMap { taker => F.uncancelable { poll => state.modify { case State(queue, size, takers, offerers) if queue.nonEmpty && offerers.isEmpty => @@ -197,16 +277,16 @@ object Queue { case State(queue, size, takers, offerers) if queue.nonEmpty => val (a, rest) = queue.dequeue val ((move, release), tail) = offerers.dequeue - State(rest.enqueue(move), size, takers, tail) -> release.complete(()).as(a) + State(rest.enqueue(move), size, takers, tail) -> ops.complete(release, ()).as(a) case State(queue, size, takers, offerers) if offerers.nonEmpty => val ((a, release), rest) = offerers.dequeue - State(queue, size, takers, rest) -> release.complete(()).as(a) + State(queue, size, takers, rest) -> ops.complete(release, ()).as(a) case State(queue, size, takers, offerers) => - val cleanup = state.update { s => s.copy(takers = s.takers.filter(_ ne taker)) } + val cleanup = state.update { s => s.copy(takers = s.takers.filter(_ != taker)) } State(queue, size, takers.enqueue(taker), offerers) -> - poll(taker.get).onCancel(cleanup) + poll(ops.get(taker)).onCancel(cleanup) }.flatten } } @@ -221,11 +301,13 @@ object Queue { case State(queue, size, takers, offerers) if queue.nonEmpty => val (a, rest) = queue.dequeue val ((move, release), tail) = offerers.dequeue - State(rest.enqueue(move), size, takers, tail) -> release.complete(()).as(a.some) + State(rest.enqueue(move), size, takers, tail) -> ops + .complete(release, ()) + .as(a.some) case State(queue, size, takers, offerers) if offerers.nonEmpty => val ((a, release), rest) = offerers.dequeue - State(queue, size, takers, rest) -> release.complete(()).as(a.some) + State(queue, size, takers, rest) -> ops.complete(release, ()).as(a.some) case s => s -> F.pure(none[A]) @@ -237,75 +319,102 @@ object Queue { } - private final class BoundedQueue[F[_], A](capacity: Int, state: Ref[F, State[F, A]])( - implicit F: GenConcurrent[F, _] - ) extends AbstractQueue(capacity, state) { - - protected def onOfferNoCapacity( - s: State[F, A], + private sealed abstract class Strategy { + def onOfferNoCapacity[F[_], D[_], A]( + s: State[D, A], a: A, - offerer: Deferred[F, Unit], - poll: Poll[F] - ): (State[F, A], F[Unit]) = { - val State(queue, size, takers, offerers) = s - val cleanup = state.update { s => s.copy(offerers = s.offerers.filter(_._2 ne offerer)) } - State(queue, size, takers, offerers.enqueue(a -> offerer)) -> poll(offerer.get) - .onCancel(cleanup) - } - - protected def onTryOfferNoCapacity(s: State[F, A], a: A): (State[F, A], F[Boolean]) = - s -> F.pure(false) - + offerer: D[Unit], + poll: Poll[F], + state: Ref[F, State[D, A]] + )( + implicit F: MonadCancel[F, _], + ops: DeferredOps[F, D] + ): (State[D, A], F[Unit]) + + def onTryOfferNoCapacity[F[_], D[_], A](s: State[D, A], a: A)( + implicit F: Applicative[F] + ): (State[D, A], F[Boolean]) } - private final class DroppingQueue[F[_], A](capacity: Int, state: Ref[F, State[F, A]])( - implicit F: GenConcurrent[F, _] - ) extends AbstractQueue(capacity, state) { + private object Strategy { + + object bounded extends Strategy { + + override def onOfferNoCapacity[F[_], D[_], A]( + s: State[D, A], + a: A, + offerer: D[Unit], + poll: Poll[F], + state: Ref[F, State[D, A]] + )( + implicit F: MonadCancel[F, _], + ops: DeferredOps[F, D] + ): (State[D, A], F[Unit]) = { + val State(queue, size, takers, offerers) = s + val cleanup = state.update { s => + s.copy(offerers = s.offerers.filter(_._2 != offerer)) + } + State(queue, size, takers, offerers.enqueue(a -> offerer)) -> poll(ops.get(offerer)) + .onCancel(cleanup) + } - protected def onOfferNoCapacity( - s: State[F, A], - a: A, - offerer: Deferred[F, Unit], - poll: Poll[F] - ): (State[F, A], F[Unit]) = - s -> F.unit + override def onTryOfferNoCapacity[F[_], D[_], A](s: State[D, A], a: A)( + implicit F: Applicative[F] + ): (State[D, A], F[Boolean]) = + s -> F.pure(false) - protected def onTryOfferNoCapacity(s: State[F, A], a: A): (State[F, A], F[Boolean]) = - s -> F.pure(false) - } + } - private final class CircularBufferQueue[F[_], A](capacity: Int, state: Ref[F, State[F, A]])( - implicit F: GenConcurrent[F, _] - ) extends AbstractQueue(capacity, state) { + object dropping extends Strategy { - protected def onOfferNoCapacity( - s: State[F, A], - a: A, - offerer: Deferred[F, Unit], - poll: Poll[F] - ): (State[F, A], F[Unit]) = { - // dotty doesn't like cats map on tuples - val (ns, fb) = onTryOfferNoCapacity(s, a) - (ns, fb.void) - } + override def onOfferNoCapacity[F[_], D[_], A]( + s: State[D, A], + a: A, + offerer: D[Unit], + poll: Poll[F], + state: Ref[F, State[D, A]] + )(implicit F: MonadCancel[F, _], ops: DeferredOps[F, D]): (State[D, A], F[Unit]) = + s -> F.unit - protected def onTryOfferNoCapacity(s: State[F, A], a: A): (State[F, A], F[Boolean]) = { - val State(queue, size, takers, offerers) = s - val (_, rest) = queue.dequeue - State(rest.enqueue(a), size, takers, offerers) -> F.pure(true) + override def onTryOfferNoCapacity[F[_], D[_], A](s: State[D, A], a: A)( + implicit F: Applicative[F]): (State[D, A], F[Boolean]) = + s -> F.pure(false) } + object circularBuffer extends Strategy { + + override def onOfferNoCapacity[F[_], D[_], A]( + s: State[D, A], + a: A, + offerer: D[Unit], + poll: Poll[F], + state: Ref[F, State[D, A]])( + implicit F: MonadCancel[F, _], + ops: DeferredOps[F, D]): (State[D, A], F[Unit]) = { + // dotty doesn't like cats map on tuples + val (ns, fb) = onTryOfferNoCapacity[F, D, A](s, a) + (ns, fb.void) + } + + override def onTryOfferNoCapacity[F[_], D[_], A](s: State[D, A], a: A)( + implicit F: Applicative[F]): (State[D, A], F[Boolean]) = { + val State(queue, size, takers, offerers) = s + val (_, rest) = queue.dequeue + State(rest.enqueue(a), size, takers, offerers) -> F.pure(true) + } + + } } - private final case class State[F[_], A]( + private final case class State[D[_], A]( queue: ScalaQueue[A], size: Int, - takers: ScalaQueue[Deferred[F, A]], - offerers: ScalaQueue[(A, Deferred[F, Unit])] + takers: ScalaQueue[D[A]], + offerers: ScalaQueue[(A, D[Unit])] ) private object State { - def empty[F[_], A]: State[F, A] = + def empty[D[_], A]: State[D, A] = State(ScalaQueue.empty, 0, ScalaQueue.empty, ScalaQueue.empty) }