Skip to content

Commit

Permalink
Reduce allocations on StreamSubscription
Browse files Browse the repository at this point in the history
  • Loading branch information
BalmungSan committed Feb 26, 2023
1 parent 0c60231 commit f52d5d2
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 81 deletions.
163 changes: 88 additions & 75 deletions core/jvm/src/main/scala/fs2/interop/flow/StreamSubscription.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ package fs2
package interop
package flow

import cats.effect.kernel.{Async, Deferred, Resource, Outcome}
import cats.effect.std.{Dispatcher, Queue}
import cats.effect.kernel.{Async, Outcome}
import cats.effect.syntax.all._
import cats.syntax.all._

import java.util.concurrent.Flow.{Subscription, Subscriber}
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}

/** Implementation of a [[Subscription]].
*
Expand All @@ -39,35 +39,50 @@ import java.util.concurrent.Flow.{Subscription, Subscriber}
private[flow] final class StreamSubscription[F[_], A] private (
stream: Stream[F, A],
sub: Subscriber[A],
requestDispatcher: Dispatcher[F],
requests: Queue[F, StreamSubscription.Request],
canceled: Deferred[F, Unit]
requests: AtomicLong,
resume: AtomicReference[() => Unit],
cancelToken: AtomicReference[() => Unit],
canceled: F[Unit]
)(implicit F: Async[F])
extends Subscription {
// Ensure we are on a terminal state; i.e. set `canceled`, before signaling the subscriber.
private def onError(ex: Throwable): F[Unit] =
cancelMe >> F.delay(sub.onError(ex))
// Ensure we are on a terminal state; i.e. call `cancel`, before signaling the subscriber.
private def onError(ex: Throwable): Unit = {
cancel()
sub.onError(ex)
}

private def onComplete: F[Unit] =
cancelMe >> F.delay(sub.onComplete())
private def onComplete(): Unit = {
cancel()
sub.onComplete()
}

private[flow] def run: F[Unit] = {
def subscriptionPipe: Pipe[F, A, A] =
in => {
def go(s: Stream[F, A]): Pull[F, A, Unit] =
Pull.eval(requests.take).flatMap {
case StreamSubscription.Request.Infinite =>
s.pull.echo

case StreamSubscription.Request.Finite(n) =>
val subscriptionPipe: Pipe[F, A, A] = in => {
def go(s: Stream[F, A]): Pull[F, A, Unit] =
Pull.eval(F.delay(requests.get())).flatMap { n =>
if (n == Long.MaxValue)
s.pull.echo
else if (n == 0)
Pull.eval(F.async_[Unit] { cb =>
// If there aren't more pending request,
// we will wait until the next one.
resume.set(() => cb.apply(Either.unit))
// However, before blocking,
// we must check if it has been a concurrent request.
// In case it was, we abort the wait.
if (requests.get() > 0)
cb.apply(Either.unit)
}) >> go(s)
else
Pull.eval(F.delay(requests.updateAndGet(r => r - n))) >>
s.pull.take(n).flatMap {
case None => Pull.done
case Some(rem) => go(rem)
}
}
}

go(in).stream
}
go(in).stream
}

val events =
stream
Expand All @@ -77,94 +92,92 @@ private[flow] final class StreamSubscription[F[_], A] private (
.drain

events
.race(canceled.get)
.race(canceled)
.guaranteeCase {
case Outcome.Succeeded(result) =>
result.flatMap {
case Left(()) => onComplete // Events finished normally.
case Left(()) => F.delay(onComplete()) // Events finished normally.
case Right(()) => F.unit // Events was canceled.
}
case Outcome.Errored(ex) => onError(ex)
case Outcome.Canceled() => cancelMe
case Outcome.Errored(ex) => F.delay(onError(ex))
case Outcome.Canceled() => F.delay(onComplete())
}
.void
}

// According to the spec, it's acceptable for a concurrent cancel to not
// be processed immediately, but if you have synchronous `cancel();
// request()`, then the request _must_ be a NOOP. Fortunately,
// ordering is guaranteed by a sequential dispatcher.
// be processed immediately, but if you have synchronous
// `cancel(); request()`,
// then the request must be a NOOP.
// See https://github.com/zainab-ali/fs2-reactive-streams/issues/29
// and https://github.com/zainab-ali/fs2-reactive-streams/issues/46
private def cancelMe: F[Unit] =
canceled.complete(()).void

override def cancel(): Unit =
try
requestDispatcher.unsafeRunAndForget(cancelMe)
catch {
case _: IllegalStateException =>
// Dispatcher already shutdown, we are on terminal state, NOOP.
override def cancel(): Unit = {
var cancelMe = cancelToken.get()

if (cancelMe ne null) {
// Loop until we get the actual cancel callback.
while (cancelMe eq StreamSubscription.Sentinel)
cancelMe = cancelToken.get()

cancelToken.set(null)
cancelMe.apply()
}
}

override def request(n: Long): Unit = {
val prog =
canceled.tryGet.flatMap {
case None =>
if (n == java.lang.Long.MAX_VALUE)
requests.offer(StreamSubscription.Request.Infinite)
else if (n > 0)
requests.offer(StreamSubscription.Request.Finite(n))
else
onError(
ex = new IllegalArgumentException(s"Invalid number of elements [${n}]")
)
override def request(n: Long): Unit =
if (cancelToken.get() ne null) {
if (n <= 0)
onError(
ex = new IllegalArgumentException(s"Invalid number of elements [${n}]")
)
else {
requests.updateAndGet { r =>
val result = r + n
if (result < 0) {
// Overflow.
Long.MaxValue
} else {
result
}
}

case Some(()) =>
F.unit
resume.get().apply()
}
try
requestDispatcher.unsafeRunAndForget(prog)
catch {
case _: IllegalStateException =>
// Dispatcher already shutdown, we are on terminal state, NOOP.
}
}
}

private[flow] object StreamSubscription {

/** Represents a downstream subscriber's request to publish elements. */
private sealed trait Request
private object Request {
case object Infinite extends Request
final case class Finite(n: Long) extends Request
}
private final val Sentinel = () => ()

// Mostly for testing purposes.
def apply[F[_], A](stream: Stream[F, A], subscriber: Subscriber[A])(implicit
F: Async[F]
): Resource[F, StreamSubscription[F, A]] =
(
Dispatcher.sequential[F](await = true),
Resource.eval(Queue.unbounded[F, Request]),
Resource.eval(Deferred[F, Unit])
).mapN { case (requestDispatcher, requests, canceled) =>
): F[StreamSubscription[F, A]] =
F.delay {
val requests = new AtomicLong(0L)
val resume = new AtomicReference(Sentinel)
val cancelToken = new AtomicReference(Sentinel)
val canceled = F.async_[Unit] { cb =>
if (!cancelToken.compareAndSet(Sentinel, () => cb.apply(Either.unit))) {
cb.apply(Either.left(new Exception("Bom")))
}
}

new StreamSubscription(
stream,
subscriber,
requestDispatcher,
requests,
resume,
cancelToken,
canceled
)
}.evalTap { subscription =>
F.delay(subscriber.onSubscribe(subscription))
}

def subscribe[F[_], A](stream: Stream[F, A], subscriber: Subscriber[A])(implicit
F: Async[F]
): F[Unit] =
apply(stream, subscriber).use { subscription =>
subscription.run
apply(stream, subscriber).flatMap { subscription =>
F.delay(subscriber.onSubscribe(subscription)) >>
subscription.run
}
}
15 changes: 9 additions & 6 deletions core/jvm/src/test/scala/fs2/interop/flow/CancellationSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,27 @@ import java.util.concurrent.atomic.AtomicBoolean
* failures due to race conditions more repeatable
*/
class CancellationSpec extends Fs2Suite {
final class Sub[A](b: AtomicBoolean) extends Subscriber[A] {
def onNext(t: A): Unit = b.set(true)
final class DummySubscriber(b: AtomicBoolean, program: Subscription => Unit)
extends Subscriber[Int] {
def onNext(i: Int): Unit = b.set(true)
def onComplete(): Unit = b.set(true)
def onError(e: Throwable): Unit = b.set(true)
def onSubscribe(s: Subscription): Unit = ()
def onSubscribe(s: Subscription): Unit =
program(s)
}

val s = Stream.range(0, 5).covary[IO]

val attempts = 5000
val attempts = 10000

def testStreamSubscription(clue: String)(program: Subscription => Unit): IO[Unit] =
IO(new AtomicBoolean(false))
.flatMap { flag =>
StreamSubscription(s, new Sub(flag)).use { subscription =>
val subscriber = new DummySubscriber(flag, program)
StreamSubscription(s, subscriber).flatMap { subscription =>
(
subscription.run,
IO(program(subscription))
IO(subscriber.onSubscribe(subscription))
).parTupled
} >>
IO(flag.get()).assertEquals(false, clue)
Expand Down

0 comments on commit f52d5d2

Please sign in to comment.