Skip to content

Commit

Permalink
Stream - GroupWithin - Some changes.
Browse files Browse the repository at this point in the history
- Rename the `n` parameter as `outputSize`

- For `dequeueN` and other internal functions, no need to pass the `n`
  parameter since that is just the `outputSize` parameter of main function.
- Extract auxiliary `toEnding` function.
  • Loading branch information
diesalbla committed Sep 18, 2021
1 parent eaba366 commit 33f401b
Showing 1 changed file with 70 additions and 72 deletions.
142 changes: 70 additions & 72 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1333,29 +1333,38 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
go(None, this).stream
}

/** Divides this stream into chunks of elements of size `n`.
* Each time a group of size `n` is emitted, `timeout` is reset.
/** Splits this stream into a stream of chunks of elements, such that each
* chunk has at most `outputSize` elements.
*
* As `this` stream emits input elements, the resulting stream holds
* those elements in a waiting area until it has enough elements to emit
* next chunk.
* To avoid holding inputs for too long, a `timeout` is given.
* This timeout is reset whenever an output chunk is emitted.
*
* If the current chunk does not reach size `n` by the time the
* `timeout` period elapses, it emits a chunk containing however
* many elements have been accumulated so far, and resets
* `timeout`.
*
* However, if no elements at all have been accumulated when
* `timeout` expires, empty chunks are *not* emitted, and `timeout`
* is not reset.
* Instead, the next chunk to arrive is emitted immediately (since
* the stream is still in a timed out state), and only then is
* `timeout` reset. If the chunk received in a timed out state is
* bigger than `n`, the first `n` elements of it are emitted
* immediately in a chunk, `timeout` is reset, and the remaining
* elements are used for the next chunk.
*
* When the stream terminates, any accumulated elements are emitted
* many elements have been accumulated so far, and resets `timeout`.
*
* However, if no input elements are being held when the `timeout` expires,
* the output stream enters into a "timed out" state. From this state,
* as soon as `this` stream emits the next input, the resulting stream
* will emit the next output chunk, obtained as the prefix of length
* `outputSize` from the input chunk (or the whole of it if smaller),
* and start holding the remaining elements for the next chunk.
*
* When the input stream terminates, any accumulated elements are emitted
* immediately in a chunk, even if `timeout` has not expired.
*
* Reverse: barring timing effects, calling `unchunks` after
* `str.groupWithin(s, t).unchunks` gives back the original stream.
*
* @param outputChunkSize the maximum size of chunks emitted by resulting stream.
* @param timeout maximum time that input elements are held in the inner buffer
* before being emitted by the resulting stream.
*/
def groupWithin[F2[x] >: F[x]](
n: Int,
outputChunkSize: Int,
timeout: FiniteDuration
)(implicit F: Temporal[F2]): Stream[F2, Chunk[O]] = {

Expand All @@ -1373,96 +1382,85 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
}
}

val outputLong = outputChunkSize.toLong
fs2.Stream.force {
for {
demand <- Semaphore[F2](n.toLong)
demand <- Semaphore[F2](outputLong)
supply <- Semaphore[F2](0L)
buffer <- Ref[F2].of(
JunctionBuffer[O](Vector.empty[O], endOfSupply = None, endOfDemand = None)
)
} yield {
/* - Buffer: stores items from input to be sent on next output chunk
* - Demand Semaphore: to avoid adding too many items to buffer
* - Supply: counts filled positions for next output chunk */
def enqueue(t: O): F2[Boolean] =
for {
_ <- demand.acquire
buf <- buffer.modify(buf => (buf.copy(buf.data :+ t), buf))
_ <- supply.release
} yield buf.endOfDemand.isEmpty

def waitN(s: Semaphore[F2]) =
F.guaranteeCase(s.acquireN(n.toLong)) {
case Outcome.Succeeded(_) => s.releaseN(n.toLong)
val dequeueNextOutput: F2[Option[Vector[O]]] = {
// Trigger: waits until the supply buffer is full (with acquireN)
val waitSupply = supply.acquireN(outputLong).guaranteeCase {
case Outcome.Succeeded(_) => supply.releaseN(outputLong)
case _ => F.unit
}

def acquireSupplyUpToNWithin(n: Long): F2[Long] =
// in JS cancellation doesn't always seem to run, so race conditions should restore state on their own
F.race(
F.sleep(timeout),
waitN(supply)
).flatMap {
case Left(_) =>
for {
_ <- supply.acquire
m <- supply.available
k = m.min(n - 1)
b <- supply.tryAcquireN(k)
} yield if (b) k + 1 else 1
case Right(_) =>
supply.acquireN(n) *> F.pure(n)
}
val onTimeout: F2[Long] =
for {
_ <- supply.acquire // waits until there is at least one element in buffer
m <- supply.available
k = m.min(outputLong - 1)
b <- supply.tryAcquireN(k)
} yield if (b) k + 1 else 1

def dequeueN(n: Int): F2[Option[Vector[O]]] =
acquireSupplyUpToNWithin(n.toLong).flatMap { n =>
buffer
.modify(_.splitAt(n.toInt))
.flatMap { buf =>
demand.releaseN(buf.data.size.toLong).flatMap { _ =>
buf.endOfSupply match {
case Some(Left(error)) =>
F.raiseError(error)
case Some(Right(_)) if buf.data.isEmpty =>
F.pure(None)
case _ =>
F.pure(Some(buf.data))
}
}
}
}
// in JS cancellation doesn't always seem to run, so race conditions should restore state on their own
for {
raced <- F.race(F.sleep(timeout), waitSupply)
acq <- if (raced.isLeft) onTimeout else supply.acquireN(outputLong).as(outputLong)
buf <- buffer.modify(_.splitAt(acq.toInt))
_ <- demand.releaseN(buf.data.size.toLong)
res <- buf.endOfSupply match {
case Some(Left(error)) => F.raiseError(error)
case Some(Right(_)) if buf.data.isEmpty => F.pure(None)
case _ => F.pure(Some(buf.data))
}
} yield res
}

def endSupply(result: Either[Throwable, Unit]): F2[Unit] =
buffer.update(_.copy(endOfSupply = Some(result))) *> supply.releaseN(Int.MaxValue)

def endDemand(result: Either[Throwable, Unit]): F2[Unit] =
buffer.update(_.copy(endOfDemand = Some(result))) *> demand.releaseN(Int.MaxValue)

def toEnding(ec: ExitCase): Either[Throwable, Unit] = ec match {
case ExitCase.Succeeded => Right(())
case ExitCase.Errored(e) => Left(e)
case ExitCase.Canceled => Right(())
}

val enqueueAsync = F.start {
this
.evalMap(enqueue)
.forall(identity)
.onFinalizeCase {
case ExitCase.Succeeded => endSupply(Right(()))
case ExitCase.Errored(e) => endSupply(Left(e))
case ExitCase.Canceled => endSupply(Right(()))
}
.onFinalizeCase(ec => endSupply(toEnding(ec)))
.compile
.drain
}

val outputStream: Stream[F2, Chunk[O]] =
fs2.Stream
.eval(dequeueNextOutput)
.repeat
.collectWhile { case Some(data) => Chunk.vector(data) }

fs2.Stream
.bracketCase(enqueueAsync) { case (upstream, exitCase) =>
val ending = exitCase match {
case ExitCase.Succeeded => Right(())
case ExitCase.Errored(e) => Left(e)
case ExitCase.Canceled => Right(())
}
endDemand(ending) *> upstream.cancel
}
.flatMap { _ =>
fs2.Stream
.eval(dequeueN(n))
.repeat
.collectWhile { case Some(data) => Chunk.vector(data) }
endDemand(toEnding(exitCase)) *> upstream.cancel
}
.flatMap(_ => outputStream)
}
}
}
Expand Down

0 comments on commit 33f401b

Please sign in to comment.