Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Stream#concurrently for short-circuiting monad transformers #2652

Merged
merged 3 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -548,11 +548,11 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
} yield {
def watch[A](str: Stream[F2, A]) = str.interruptWhen(interrupt.get.attempt)

val compileBack: F2[Boolean] = watch(that).compile.drain.attempt.flatMap {
val compileBack: F2[Unit] = watch(that).compile.drain.guaranteeCase {
// Pass the result of backstream completion in the backResult deferred.
// IF result of back-stream was failed, interrupt fore. Otherwise, let it be
case r @ Right(_) => backResult.complete(r)
case l @ Left(_) => backResult.complete(l) >> interrupt.complete(())
case Outcome.Errored(t) => backResult.complete(Left(t)) >> interrupt.complete(()).void
case _ => backResult.complete(Right(())).void
}

// stop background process but await for it to finalise with a result
Expand Down
20 changes: 20 additions & 0 deletions core/shared/src/test/scala/fs2/StreamConcurrentlySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package fs2

import scala.concurrent.duration._

import cats.data.EitherT
import cats.effect.IO
import cats.effect.kernel.{Deferred, Ref}
import cats.effect.std.Semaphore
Expand Down Expand Up @@ -170,4 +171,23 @@ class StreamConcurrentlySuite extends Fs2Suite {
.lastOrError
.map(cnt => assert(cnt >= iterations, s"cnt: $cnt, iterations: $iterations"))
}

test("background stream completes with short-circuiting transformers") {
Stream(1, 2, 3)
.concurrently(Stream.eval(EitherT.leftT[IO, Int]("left")))
.compile
.lastOrError
.value
.assertEquals(Right(3))
}

test("foreground stream short-circuits") {
Stream(1, 2, 3)
.evalMap(n => EitherT.cond[IO](n % 2 == 0, n, "left"))
.concurrently(Stream.eval(EitherT.rightT[IO, String](42)))
.compile
.lastOrError
.value
.assertEquals(Left("left"))
}
}
12 changes: 11 additions & 1 deletion io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package fs2
package io

import cats.data.EitherT
import cats.effect.{IO, Resource}
import cats.effect.unsafe.{IORuntime, IORuntimeConfig}
import fs2.{Err, Fs2Suite}
Expand Down Expand Up @@ -183,7 +184,8 @@ class IoPlatformSuite extends Fs2Suite {
}

test("can copy more than Int.MaxValue bytes") {
// Unit test adapted from the original issue reproduction at https://github.com/mrdziuban/fs2-writeOutputStream.
// Unit test adapted from the original issue reproduction at
// https://github.com/mrdziuban/fs2-writeOutputStream.

val byteStream =
Stream
Expand All @@ -203,5 +205,13 @@ class IoPlatformSuite extends Fs2Suite {
.compile
.drain
}

test("works with short-circuiting monad transformers") {
// Unit test adapted from the original issue reproduction at
// https://github.com/mrdziuban/fs2-readOutputStream-EitherT.

readOutputStream(1)(_ => EitherT.left[Unit](IO.unit)).compile.drain.value
.timeout(5.seconds)
}
}
}