Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream - GroupWithin - Some changes. #2619

Merged
merged 1 commit into from
Sep 21, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 76 additions & 76 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1333,29 +1333,38 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
go(None, this).stream
}

/** Divides this stream into chunks of elements of size `n`.
* Each time a group of size `n` is emitted, `timeout` is reset.
*
* If the current chunk does not reach size `n` by the time the
* `timeout` period elapses, it emits a chunk containing however
* many elements have been accumulated so far, and resets
* `timeout`.
*
* However, if no elements at all have been accumulated when
* `timeout` expires, empty chunks are *not* emitted, and `timeout`
* is not reset.
* Instead, the next chunk to arrive is emitted immediately (since
* the stream is still in a timed out state), and only then is
* `timeout` reset. If the chunk received in a timed out state is
* bigger than `n`, the first `n` elements of it are emitted
* immediately in a chunk, `timeout` is reset, and the remaining
* elements are used for the next chunk.
*
* When the stream terminates, any accumulated elements are emitted
/** Splits this stream into a stream of chunks of elements, such that
* 1. each chunk in the output has at most `outputSize` elements, and
* 2. the concatenation of those chunks, which is obtained by calling
* `unchunks`, yields the same element sequence as this stream.
*
* As `this` stream emits input elements, the result stream them in a
* waiting buffer, until it has enough elements to emit next chunk.
*
* To avoid holding input elements for too long, this method takes a
* `timeout`. This timeout is reset after each output chunk is emitted.
*
* When the timeout expires, if the buffer contains any elements, then
* all elements in the buffer are emitted in an output chunk, even if
* there are fewer than `chunkSize` elements, and the timeout is reset.
*
* However, if the buffer is empty when the `timeout` expires, then the
* output stream enters into a "timed out" state. From it, as soon as
* `this` stream emits the next chunk of input, the resulting stream
* will emit its next output chunk and reset timeout again. If that input
* chunk is shorter than the `chunkSize`, it is emitted whole. Otherwise,
* only the first `chunkSize` elements are emitted, and the rest are put
* in the buffer.
*
* When the input stream terminates, any accumulated elements are emitted
* immediately in a chunk, even if `timeout` has not expired.
*
* @param chunkSize the maximum size of chunks emitted by resulting stream.
* @param timeout maximum time that input elements are held in the buffer
* before being emitted by the resulting stream.
*/
def groupWithin[F2[x] >: F[x]](
n: Int,
chunkSize: Int,
timeout: FiniteDuration
)(implicit F: Temporal[F2]): Stream[F2, Chunk[O]] = {

Expand All @@ -1373,96 +1382,87 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
}
}

val outputLong = chunkSize.toLong
fs2.Stream.force {
for {
demand <- Semaphore[F2](n.toLong)
demand <- Semaphore[F2](outputLong)
supply <- Semaphore[F2](0L)
buffer <- Ref[F2].of(
JunctionBuffer[O](Vector.empty[O], endOfSupply = None, endOfDemand = None)
)
} yield {
/* - Buffer: stores items from input to be sent on next output chunk
* - Demand Semaphore: to avoid adding too many items to buffer
* - Supply: counts filled positions for next output chunk */
def enqueue(t: O): F2[Boolean] =
for {
_ <- demand.acquire
buf <- buffer.modify(buf => (buf.copy(buf.data :+ t), buf))
_ <- supply.release
} yield buf.endOfDemand.isEmpty

def waitN(s: Semaphore[F2]) =
F.guaranteeCase(s.acquireN(n.toLong)) {
case Outcome.Succeeded(_) => s.releaseN(n.toLong)
val dequeueNextOutput: F2[Option[Vector[O]]] = {
// Trigger: waits until the supply buffer is full (with acquireN)
val waitSupply = supply.acquireN(outputLong).guaranteeCase {
case Outcome.Succeeded(_) => supply.releaseN(outputLong)
case _ => F.unit
}

def acquireSupplyUpToNWithin(n: Long): F2[Long] =
// in JS cancellation doesn't always seem to run, so race conditions should restore state on their own
F.race(
F.sleep(timeout),
waitN(supply)
).flatMap {
case Left(_) =>
for {
_ <- supply.acquire
m <- supply.available
k = m.min(n - 1)
b <- supply.tryAcquireN(k)
} yield if (b) k + 1 else 1
case Right(_) =>
supply.acquireN(n) *> F.pure(n)
}
val onTimeout: F2[Long] =
for {
_ <- supply.acquire // waits until there is at least one element in buffer
m <- supply.available
k = m.min(outputLong - 1)
b <- supply.tryAcquireN(k)
} yield if (b) k + 1 else 1

def dequeueN(n: Int): F2[Option[Vector[O]]] =
acquireSupplyUpToNWithin(n.toLong).flatMap { n =>
buffer
.modify(_.splitAt(n.toInt))
.flatMap { buf =>
demand.releaseN(buf.data.size.toLong).flatMap { _ =>
buf.endOfSupply match {
case Some(Left(error)) =>
F.raiseError(error)
case Some(Right(_)) if buf.data.isEmpty =>
F.pure(None)
case _ =>
F.pure(Some(buf.data))
}
}
}
}
// in JS cancellation doesn't always seem to run, so race conditions should restore state on their own
for {
acq <- F.race(F.sleep(timeout), waitSupply).flatMap {
case Left(_) => onTimeout
case Right(_) => supply.acquireN(outputLong).as(outputLong)
}
buf <- buffer.modify(_.splitAt(acq.toInt))
_ <- demand.releaseN(buf.data.size.toLong)
res <- buf.endOfSupply match {
case Some(Left(error)) => F.raiseError(error)
case Some(Right(_)) if buf.data.isEmpty => F.pure(None)
case _ => F.pure(Some(buf.data))
}
} yield res
}

def endSupply(result: Either[Throwable, Unit]): F2[Unit] =
buffer.update(_.copy(endOfSupply = Some(result))) *> supply.releaseN(Int.MaxValue)

def endDemand(result: Either[Throwable, Unit]): F2[Unit] =
buffer.update(_.copy(endOfDemand = Some(result))) *> demand.releaseN(Int.MaxValue)

def toEnding(ec: ExitCase): Either[Throwable, Unit] = ec match {
case ExitCase.Succeeded => Right(())
case ExitCase.Errored(e) => Left(e)
case ExitCase.Canceled => Right(())
}

val enqueueAsync = F.start {
this
.evalMap(enqueue)
.forall(identity)
.onFinalizeCase {
case ExitCase.Succeeded => endSupply(Right(()))
case ExitCase.Errored(e) => endSupply(Left(e))
case ExitCase.Canceled => endSupply(Right(()))
}
.onFinalizeCase(ec => endSupply(toEnding(ec)))
.compile
.drain
}

fs2.Stream
val outputStream: Stream[F2, Chunk[O]] =
Stream
.eval(dequeueNextOutput)
.repeat
.collectWhile { case Some(data) => Chunk.vector(data) }

Stream
.bracketCase(enqueueAsync) { case (upstream, exitCase) =>
val ending = exitCase match {
case ExitCase.Succeeded => Right(())
case ExitCase.Errored(e) => Left(e)
case ExitCase.Canceled => Right(())
}
endDemand(ending) *> upstream.cancel
}
.flatMap { _ =>
fs2.Stream
.eval(dequeueN(n))
.repeat
.collectWhile { case Some(data) => Chunk.vector(data) }
}
endDemand(toEnding(exitCase)) *> upstream.cancel
} >> outputStream
}
}
}
Expand Down