Skip to content

Commit

Permalink
Merge pull request #3975 from djspiewak/feature/unsafe-queue
Browse files Browse the repository at this point in the history
  • Loading branch information
djspiewak authored May 30, 2024
2 parents 1ee6ec3 + 0572831 commit 1e445fb
Show file tree
Hide file tree
Showing 9 changed files with 470 additions and 149 deletions.
193 changes: 48 additions & 145 deletions std/shared/src/main/scala/cats/effect/std/Queue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,26 @@ object Queue {
private[effect] def unboundedForAsync[F[_], A](implicit F: Async[F]): F[Queue[F, A]] =
F.delay(new UnboundedAsyncQueue())

/**
* Creates a new `Queue` subject to some `capacity` bound which supports a side-effecting
* `tryOffer` function, allowing impure code to directly add values to the queue without
* indirecting through something like [[Dispatcher]]. This can improve performance
* significantly in some common cases. Note that the queue produced by this constructor can be
* used as a perfectly conventional [[Queue]] (as it is a subtype).
*
* @param capacity
* the maximum capacity of the queue (must be strictly greater than 1 and less than 32768)
* @return
* an empty bounded queue
* @see
* [[cats.effect.std.unsafe.BoundedQueue]]
*/
def unsafeBounded[F[_], A](capacity: Int)(
implicit F: Async[F]): F[unsafe.BoundedQueue[F, A]] = {
require(capacity > 1 && capacity < Short.MaxValue.toInt * 2)
F.delay(new BoundedAsyncQueue(capacity))
}

/**
* Constructs a queue through which a single element can pass only in the case when there are
* at least one taking fiber and at least one offering fiber for `F` data types that are
Expand Down Expand Up @@ -135,6 +155,21 @@ object Queue {
unboundedForConcurrent
}

/**
* Creates a new unbounded `Queue` which supports a side-effecting `offer` function, allowing
* impure code to directly add values to the queue without indirecting through something like
* [[Dispatcher]]. This can improve performance significantly in some common cases. Note that
* the queue produced by this constructor can be used as a perfectly conventional [[Queue]]
* (as it is a subtype).
*
* @return
* an empty unbounded queue
* @see
* [[cats.effect.std.unsafe.UnboundedQueue]]
*/
def unsafeUnbounded[F[_], A](implicit F: Async[F]): F[unsafe.UnboundedQueue[F, A]] =
F.delay(new UnboundedAsyncQueue())

/**
* Constructs an empty, bounded, dropping queue holding up to `capacity` elements for `F` data
* types that are [[cats.effect.kernel.GenConcurrent]]. When the queue is full (contains
Expand Down Expand Up @@ -542,7 +577,9 @@ object Queue {
* Does not correctly handle bound = 0 because take waiters are async[Unit]
*/
private final class BoundedAsyncQueue[F[_], A](capacity: Int)(implicit F: Async[F])
extends Queue[F, A] {
extends Queue[F, A]
with unsafe.BoundedQueue[F, A] {

require(capacity > 1)

private[this] val buffer = new UnsafeBounded[A](capacity)
Expand Down Expand Up @@ -625,7 +662,7 @@ object Queue {
}
}

def tryOffer(a: A): F[Boolean] = F delay {
def unsafeTryOffer(a: A): Boolean = {
try {
buffer.put(a)
notifyOne(takers)
Expand All @@ -636,6 +673,8 @@ object Queue {
}
}

def tryOffer(a: A): F[Boolean] = F.delay(unsafeTryOffer(a))

val size: F[Int] = F.delay(buffer.size())

val take: F[A] =
Expand Down Expand Up @@ -802,16 +841,21 @@ object Queue {
}
}

private final class UnboundedAsyncQueue[F[_], A]()(implicit F: Async[F]) extends Queue[F, A] {
private final class UnboundedAsyncQueue[F[_], A]()(implicit F: Async[F])
extends Queue[F, A]
with unsafe.UnboundedQueue[F, A] {

private[this] val buffer = new UnsafeUnbounded[A]()
private[this] val takers = new UnsafeUnbounded[Either[Throwable, Unit] => Unit]()
private[this] val FailureSignal = cats.effect.std.FailureSignal // prefetch

def offer(a: A): F[Unit] = F delay {
def unsafeOffer(a: A): Unit = {
buffer.put(a)
notifyOne()
}

def offer(a: A): F[Unit] = F.delay(unsafeOffer(a))

def tryOffer(a: A): F[Boolean] = F delay {
buffer.put(a)
notifyOne()
Expand Down Expand Up @@ -1110,144 +1154,3 @@ object Queue {
}
}
}

trait QueueSource[F[_], A] {

/**
* Dequeues an element from the front of the queue, possibly fiber blocking until an element
* becomes available.
*/
def take: F[A]

/**
* Attempts to dequeue an element from the front of the queue, if one is available without
* fiber blocking.
*
* @return
* an effect that describes whether the dequeueing of an element from the queue succeeded
* without blocking, with `None` denoting that no element was available
*/
def tryTake: F[Option[A]]

/**
* Attempts to dequeue elements from the front of the queue, if they are available without
* semantically blocking. This method does not guarantee any additional performance benefits
* beyond simply recursively calling [[tryTake]], though some implementations will provide a
* more efficient implementation.
*
* @param maxN
* The max elements to dequeue. Passing `None` will try to dequeue the whole queue.
*
* @return
* an effect that contains the dequeued elements
*/
def tryTakeN(maxN: Option[Int])(implicit F: Monad[F]): F[List[A]] =
QueueSource.tryTakeN[F, A](maxN, tryTake)

def size: F[Int]
}

object QueueSource {

private[std] def tryTakeN[F[_], A](maxN: Option[Int], tryTake: F[Option[A]])(
implicit F: Monad[F]): F[List[A]] = {
QueueSource.assertMaxNPositive(maxN)

def loop(i: Int, limit: Int, acc: List[A]): F[List[A]] =
if (i >= limit)
F.pure(acc.reverse)
else
tryTake flatMap {
case Some(a) => loop(i + 1, limit, a :: acc)
case None => F.pure(acc.reverse)
}

maxN match {
case Some(limit) => loop(0, limit, Nil)
case None => loop(0, Int.MaxValue, Nil)
}
}

private[std] def assertMaxNPositive(maxN: Option[Int]): Unit = maxN match {
case Some(n) if n <= 0 =>
throw new IllegalArgumentException(s"Provided maxN parameter must be positive, was $n")
case _ => ()
}

implicit def catsFunctorForQueueSource[F[_]: Functor]: Functor[QueueSource[F, *]] =
new Functor[QueueSource[F, *]] {
override def map[A, B](fa: QueueSource[F, A])(f: A => B): QueueSource[F, B] =
new QueueSource[F, B] {
override def take: F[B] =
fa.take.map(f)
override def tryTake: F[Option[B]] = {
fa.tryTake.map(_.map(f))
}
override def size: F[Int] =
fa.size
}
}
}

trait QueueSink[F[_], A] {

/**
* Enqueues the given element at the back of the queue, possibly fiber blocking until
* sufficient capacity becomes available.
*
* @param a
* the element to be put at the back of the queue
*/
def offer(a: A): F[Unit]

/**
* Attempts to enqueue the given element at the back of the queue without semantically
* blocking.
*
* @param a
* the element to be put at the back of the queue
* @return
* an effect that describes whether the enqueuing of the given element succeeded without
* blocking
*/
def tryOffer(a: A): F[Boolean]

/**
* Attempts to enqueue the given elements at the back of the queue without semantically
* blocking. If an item in the list cannot be enqueued, the remaining elements will be
* returned. This is a convenience method that recursively runs `tryOffer` and does not offer
* any additional performance benefits.
*
* @param list
* the elements to be put at the back of the queue
* @return
* an effect that contains the remaining valus that could not be offered.
*/
def tryOfferN(list: List[A])(implicit F: Monad[F]): F[List[A]] =
QueueSink.tryOfferN[F, A](list, tryOffer)

}

object QueueSink {

private[std] def tryOfferN[F[_], A](list: List[A], tryOffer: A => F[Boolean])(
implicit F: Monad[F]): F[List[A]] = list match {
case Nil => F.pure(list)
case h :: t =>
tryOffer(h).ifM(
tryOfferN(t, tryOffer),
F.pure(list)
)
}

implicit def catsContravariantForQueueSink[F[_]]: Contravariant[QueueSink[F, *]] =
new Contravariant[QueueSink[F, *]] {
override def contramap[A, B](fa: QueueSink[F, A])(f: B => A): QueueSink[F, B] =
new QueueSink[F, B] {
override def offer(b: B): F[Unit] =
fa.offer(f(b))
override def tryOffer(b: B): F[Boolean] =
fa.tryOffer(f(b))
}
}
}
83 changes: 83 additions & 0 deletions std/shared/src/main/scala/cats/effect/std/QueueSink.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2020-2024 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats
package effect
package std

import cats.syntax.all._

trait QueueSink[F[_], A] {

/**
* Enqueues the given element at the back of the queue, possibly fiber blocking until
* sufficient capacity becomes available.
*
* @param a
* the element to be put at the back of the queue
*/
def offer(a: A): F[Unit]

/**
* Attempts to enqueue the given element at the back of the queue without semantically
* blocking.
*
* @param a
* the element to be put at the back of the queue
* @return
* an effect that describes whether the enqueuing of the given element succeeded without
* blocking
*/
def tryOffer(a: A): F[Boolean]

/**
* Attempts to enqueue the given elements at the back of the queue without semantically
* blocking. If an item in the list cannot be enqueued, the remaining elements will be
* returned. This is a convenience method that recursively runs `tryOffer` and does not offer
* any additional performance benefits.
*
* @param list
* the elements to be put at the back of the queue
* @return
* an effect that contains the remaining valus that could not be offered.
*/
def tryOfferN(list: List[A])(implicit F: Monad[F]): F[List[A]] =
QueueSink.tryOfferN(list, tryOffer)
}

object QueueSink {

private[std] def tryOfferN[F[_], A](list: List[A], tryOffer: A => F[Boolean])(
implicit F: Monad[F]): F[List[A]] = list match {
case Nil => F.pure(list)
case h :: t =>
tryOffer(h).ifM(
tryOfferN(t, tryOffer),
F.pure(list)
)
}

implicit def catsContravariantForQueueSink[F[_]]: Contravariant[QueueSink[F, *]] =
new Contravariant[QueueSink[F, *]] {
override def contramap[A, B](fa: QueueSink[F, A])(f: B => A): QueueSink[F, B] =
new QueueSink[F, B] {
override def offer(b: B): F[Unit] =
fa.offer(f(b))
override def tryOffer(b: B): F[Boolean] =
fa.tryOffer(f(b))
}
}
}
Loading

0 comments on commit 1e445fb

Please sign in to comment.