diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index eebc94806c..a6c1cb10a2 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2378,28 +2378,47 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, ): Stream[F2, O] = Stream.suspend { assert(maxFactor >= minFactor, "maxFactor should be greater or equal to minFactor") - val random = new scala.util.Random(seed) - def factor: Double = Math.abs(random.nextInt()) % (maxFactor - minFactor) + minFactor - def nextSize(sourceSize: Int): Int = (factor * sourceSize).toInt + underlying.uncons.flatMap { + case None => Pull.done + case Some((hd, tl)) => + val random = new scala.util.Random(seed) + def factor: Double = Math.abs(random.nextInt()) % (maxFactor - minFactor) + minFactor + + def nextSize(sourceSize: Int): Int = (factor * sourceSize).toInt + + def go( + acc: Chunk[O], + sizeOpt: Int, + lastChunkSize: Int, + s: Pull[F2, O, Unit] + ): Pull[F2, O, Unit] = { + + val size = if (sizeOpt > 0) sizeOpt else nextSize(lastChunkSize) - def go(acc: Chunk[O], sizeOpt: Int, s: Pull[F2, O, Unit]): Pull[F2, O, Unit] = - s.uncons.flatMap { - case Some((hd, tl)) => - val size = if (sizeOpt > 0) sizeOpt else nextSize(hd.size) if (acc.size < size) - go(acc ++ hd, size, tl) + s.uncons.flatMap { + case None => Pull.output(acc) + case Some((hd, tl)) => + go(acc ++ hd, size, hd.size, tl) + } else if (acc.size == size) - Pull.output(acc) >> go(hd, size, tl) + Pull.output(acc) >> + s.uncons.flatMap { + case None => Pull.done + case Some((hd, tl)) => + go(hd, size, hd.size, tl) + } else { val (out, rem) = acc.splitAt(size - 1) - Pull.output(out) >> go(rem ++ hd, -1, tl) + Pull.output(out) >> go(rem, -1, lastChunkSize, s) + } - case None => - Pull.output(acc) - } - go(Chunk.empty, -1, underlying).stream + } + + go(hd, -1, hd.size, tl) + }.stream } /** Rechunks the stream such that output chunks are within [inputChunk.size * minFactor, inputChunk.size * maxFactor]. diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 680c494e3e..1f2c4e7d59 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -1354,6 +1354,16 @@ class StreamCombinatorsSuite extends Fs2Suite { } } } + + test("correctly rechunk big chunks at the end of a stream") { + val chunks = Stream + .chunk(Chunk.seq(List.fill(5000)(1))) + .rechunkRandomlyWithSeed(0.01, 0.1)(1L) + .chunks + .compile + .toList + assert(chunks.forall(_.size <= 500)) + } } group("rechunkRandomly") {