diff --git a/build.sbt b/build.sbt index 5f596446ba..bc3fbf9cf7 100644 --- a/build.sbt +++ b/build.sbt @@ -135,7 +135,10 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq( "fs2.io.file.Files._runJavaCollectionResource" ), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.file.Files.list"), - ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.file.Files.watch") + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.file.Files.watch"), + ProblemFilters.exclude[MissingClassProblem]("fs2.Pull$MapOutput$"), + ProblemFilters.exclude[MissingClassProblem]("fs2.Pull$MapOutput"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.Pull.mapOutput") ) lazy val root = project diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index 1d29d8d58d..0055da152b 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -25,7 +25,6 @@ import scala.annotation.{nowarn, tailrec} import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal -import cats.data.AndThen import cats.{Eval => _, _} import cats.effect.kernel._ import cats.syntax.all._ @@ -708,11 +707,6 @@ object Pull extends PullLowPriority { fk: G ~> F ) extends Action[F, O, Unit] - private final case class MapOutput[+F[_], O, +P]( - stream: Pull[F, O, Unit], - fun: AndThen[O, P] - ) extends Action[F, P, Unit] - private final case class FlatMapOutput[+F[_], O, +P]( stream: Pull[F, O, Unit], fun: O => Pull[F, P, Unit] @@ -932,11 +926,6 @@ object Pull extends PullLowPriority { def fail(e: Throwable): F[End] = goErr(e, view) } - def goMapOutput[Z](mout: MapOutput[G, Z, X], view: Cont[Unit, G, X]): F[End] = { - val mapRun = new MapOutR(view, mout.fun) - go(scope, extendedTopLevelScope, translation, mapRun, mout.stream) - } - abstract class StepRunR[Y, S](view: Cont[Option[S], G, X]) extends Run[G, Y, F[End]] { def done(scope: Scope[F]): F[End] = interruptGuard(scope, view) { @@ -977,30 +966,6 @@ object Pull extends PullLowPriority { F.unit >> go(scope, extendedTopLevelScope, translation, new FlatMapR(view, fmout.fun), fmout.stream) - class MapOutR[Y](view: Cont[Unit, G, X], fun: AndThen[Y, X]) extends Run[G, Y, F[End]] { - - def done(scope: Scope[F]): F[End] = - go(scope, extendedTopLevelScope, translation, runner, view(unit)) - - def interrupted(inter: Interrupted): F[End] = - go(scope, extendedTopLevelScope, translation, runner, view(inter)) - - def fail(e: Throwable): F[End] = goErr(e, view) - - def out(head: Chunk[Y], scope: Scope[F], tail: Pull[G, Y, Unit]): F[End] = - try { - val mappedHead: Chunk[X] = head.map(fun) - val mappedTail = mapOutput(tail, fun) - val handledTail = transformWith(mappedTail) { - case interruption @ Interrupted(_, _) => - MapOutput[G, Y, X](interruptBoundary(tail, interruption), fun) - case r => r - } - val next = bindView(handledTail, view) - runner.out(mappedHead, scope, next) - } catch { case NonFatal(e) => fail(e) } - } - class FlatMapR[Y](view: Cont[Unit, G, X], fun: Y => Pull[G, X, Unit]) extends Run[G, Y, F[End]] { private[this] def unconsed(chunk: Chunk[Y], tail: Pull[G, Y, Unit]): Pull[G, X, Unit] = @@ -1222,12 +1187,11 @@ object Pull extends PullLowPriority { val result = Succeeded(scope.asInstanceOf[y]) go(scope, extendedTopLevelScope, translation, runner, view(result)) - case mout: MapOutput[g, z, _] => goMapOutput[z](mout, view) - case eval: Eval[G, r] => goEval[r](eval, view) - case acquire: Acquire[G, y] => goAcquire(acquire, view) - case inScope: InScope[g, _] => goInScope(inScope.stream, inScope.useInterruption, view) - case int: InterruptWhen[g] => goInterruptWhen(translation(int.haltOnSignal), view) - case close: CloseScope => goCloseScope(close, view) + case eval: Eval[G, r] => goEval[r](eval, view) + case acquire: Acquire[G, y] => goAcquire(acquire, view) + case inScope: InScope[g, _] => goInScope(inScope.stream, inScope.useInterruption, view) + case int: InterruptWhen[g] => goInterruptWhen(translation(int.haltOnSignal), view) + case close: CloseScope => goCloseScope(close, view) } case _: Succeeded[_] => runner.done(scope) case failed: Fail => runner.fail(failed.error) @@ -1292,16 +1256,23 @@ object Pull extends PullLowPriority { /* Applies the outputs of this pull to `f` and returns the result in a new `Pull`. */ private[fs2] def mapOutput[F[_], O, P]( - stream: Pull[F, O, Unit], - fun: O => P + s: Stream[F, O], + f: O => P ): Pull[F, P, Unit] = - stream match { - case a: AlgEffect[F, _] => a - case t: Translate[g, f, _] => Translate[g, f, P](mapOutput(t.stream, fun), t.fk) - case m: MapOutput[f, q, o] => MapOutput(m.stream, m.fun.andThen(fun)) - case r: Terminal[_] => r - case _ => MapOutput(stream, AndThen(fun)) - } + interruptScope(mapOutputNoScope(s, f)) + + /** Like `mapOutput` but does not insert an interruption scope. */ + private[fs2] def mapOutputNoScope[F[_], O, P]( + s: Stream[F, O], + f: O => P + ): Pull[F, P, Unit] = { + def go(s: Stream[F, O]): Pull[F, P, Unit] = + s.pull.uncons.flatMap { + case None => Pull.done + case Some((hd, tl)) => Pull.output(hd.map(f)) >> go(tl) + } + go(s) + } private[this] def transformWith[F[_], O, R, S](p: Pull[F, O, R])( f: Terminal[R] => Pull[F, O, S] diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index f9c5f84cde..68e181d209 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1717,7 +1717,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * }}} */ def map[O2](f: O => O2): Stream[F, O2] = - Pull.mapOutput(underlying, f).streamNoScope + Pull.mapOutput(this, f).streamNoScope + + private def mapNoScope[O2](f: O => O2): Stream[F, O2] = + Pull.mapOutputNoScope(this, f).streamNoScope /** Maps a running total according to `S` and the input with the function `f`. * @@ -2677,10 +2680,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, Pull.output(hd.map(o2 => f(pad1, o2))) >> contRight(tl) def contLeft(s: Stream[F2, O2]): Pull[F2, O4, Unit] = - Pull.mapOutput(s.pull.echo, f(_, pad2)) + Pull.mapOutputNoScope(s, f(_, pad2)) def contRight(s: Stream[F2, O3]): Pull[F2, O4, Unit] = - Pull.mapOutput(s.pull.echo, f(pad1, _)) + Pull.mapOutputNoScope(s, f(pad1, _)) zipWith_[F2, O2, O3, O4](that)(cont1, cont2, contRight)(f) } @@ -3522,7 +3525,7 @@ object Stream extends StreamLowPriority { .bracketFullWeak(resource) { case ((_, release), exit) => release(exit) } - .map(_._1) + .mapNoScope(_._1) case Resource.Bind(source, f) => resourceWeak(source).flatMap(o => resourceWeak(f(o))) case Resource.Eval(fo) => Stream.eval(fo)