Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Stream#noInterruptScope #2843

Closed
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,12 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[DirectMissingMethodProblem](
"fs2.compression.Compression.gunzip$default$1$"
),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.ChunkCompanionPlatform.makeArrayBuilder")
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.ChunkCompanionPlatform.makeArrayBuilder"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("fs2.Pull#InScope.useInterruption"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.Pull#InScope.copy"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("fs2.Pull#InScope.copy$default$2"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.Pull#InScope.this"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.Pull#InScope.apply")
)

lazy val root = tlCrossRootProject
Expand Down
15 changes: 11 additions & 4 deletions core/shared/src/main/scala/fs2/Pull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ object Pull extends PullLowPriority {

private final case class InScope[+F[_], +O](
stream: Pull[F, O, Unit],
useInterruption: Boolean
useInterruption: Scope.Interrupt
) extends Action[F, O, Unit]

private final case class InterruptWhen[+F[_]](haltOnSignal: F[Either[Throwable, Unit]])
Expand Down Expand Up @@ -817,12 +817,19 @@ object Pull extends PullLowPriority {
/** Wraps supplied pull in new scope, that will be opened before this pull is evaluated
* and closed once this pull either finishes its evaluation or when it fails.
*/
private[fs2] def scope[F[_], O](s: Pull[F, O, Unit]): Pull[F, O, Unit] = InScope(s, false)
private[fs2] def scope[F[_], O](s: Pull[F, O, Unit]): Pull[F, O, Unit] =
InScope(s, Scope.Interrupt.Inherited)

/** Like `scope` but allows this scope to be interrupted.
* Note that this may fail with `Interrupted` when interruption occurred
*/
private[fs2] def interruptScope[F[_], O](s: Pull[F, O, Unit]): Pull[F, O, Unit] = InScope(s, true)
private[fs2] def interruptScope[F[_], O](s: Pull[F, O, Unit]): Pull[F, O, Unit] =
InScope(s, Scope.Interrupt.Enabled)

/** Like `scope` but prevents this scope from being interrupted.
*/
private[fs2] def noInterruptScope[F[_], O](s: Pull[F, O, Unit]): Pull[F, O, Unit] =
InScope(s, Scope.Interrupt.Disabled)

private[fs2] def interruptWhen[F[_], O](
haltOnSignal: F[Either[Throwable, Unit]]
Expand Down Expand Up @@ -1098,7 +1105,7 @@ object Pull extends PullLowPriority {

def goInScope(
stream: Pull[G, X, Unit],
useInterruption: Boolean,
useInterruption: Scope.Interrupt,
view: Cont[Unit, G, X]
): F[B] = {
def endScope(scopeId: Unique.Token, result: Terminal[Unit]): Pull[G, X, Unit] =
Expand Down
5 changes: 5 additions & 0 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1976,6 +1976,11 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
): Stream[F2, O2] =
that.mergeHaltL(this)

/** Creates a scope that cannot be interrupted.
*/
def noInterruptScope: Stream[F, O] =
new Stream(Pull.noInterruptScope(underlying))

/** Emits each output wrapped in a `Some` and emits a `None` at the end of the stream.
*
* `s.noneTerminate.unNoneTerminate == s`
Expand Down
31 changes: 24 additions & 7 deletions core/shared/src/main/scala/fs2/internal/Scope.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,16 @@ private[fs2] final class Scope[F[_]] private (
*
* Returns scope that has to be used in next compilation step.
*/
def open(interruptible: Boolean): F[Either[Throwable, Scope[F]]] = {
def open(interruptible: Scope.Interrupt): F[Either[Throwable, Scope[F]]] = {
/*
* Creates a context for a new scope.
*
* We need to differentiate between three states:
* We need to differentiate between four states:
* The new scope is not interruptible -
* It should ignore the interrupt of the current scope. But it should not
* close the listening on parent scope close when the new scope will close.
*
* The new scope inherits interruptibility -
* It should respect the interrupt of the current scope. But it should not
* close the listening on parent scope close when the new scope will close.
*
Expand All @@ -132,16 +136,22 @@ private[fs2] final class Scope[F[_]] private (
*
*/
val createScope: F[Scope[F]] = F.unique.flatMap { newScopeId =>
self.interruptible match {
val optFCtx = self.interruptible match {
case None =>
val optFCtx = if (interruptible) F.interruptContext(newScopeId) else None
optFCtx.sequence.flatMap(iCtx => Scope[F](newScopeId, Some(self), iCtx))
interruptible match {
case Scope.Interrupt.Enabled => F.interruptContext(newScopeId)
case Scope.Interrupt.Inherited | Scope.Interrupt.Disabled => None
}

case Some(parentICtx) =>
parentICtx.childContext(interruptible, newScopeId).flatMap { iCtx =>
Scope[F](newScopeId, Some(self), Some(iCtx))
interruptible match {
case Scope.Interrupt.Enabled => Some(parentICtx.childContext(true, newScopeId))
case Scope.Interrupt.Inherited => Some(parentICtx.childContext(false, newScopeId))
case Scope.Interrupt.Disabled => None
}
}

optFCtx.sequence.flatMap(iCtx => Scope[F](newScopeId, Some(self), iCtx))
}

createScope.flatMap { scope =>
Expand Down Expand Up @@ -485,6 +495,13 @@ private[fs2] final class Scope[F[_]] private (

private[fs2] object Scope {

sealed abstract class Interrupt
object Interrupt {
object Disabled extends Interrupt
object Enabled extends Interrupt
object Inherited extends Interrupt
}

private def apply[F[_]](
id: Unique.Token,
parent: Option[Scope[F]],
Expand Down
13 changes: 12 additions & 1 deletion core/shared/src/test/scala/fs2/StreamInterruptSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package fs2

import scala.concurrent.duration._

import cats.effect.{IO, Sync}
import cats.effect.{IO, IOLocal, Sync}
import cats.effect.kernel.Deferred
import cats.effect.std.Semaphore
import org.scalacheck.effect.PropF.forAllF
Expand Down Expand Up @@ -369,4 +369,15 @@ class StreamInterruptSuite extends Fs2Suite {
val interrupt = IO.sleep(250.millis)
IO.race(compileWithSync(s).drain, interrupt).map(_.isRight).assert
}

test("issue #2842 - no interrupt scope") {
val s = for {
local <- Stream.eval(IOLocal(List.empty[Int]))
_ <- Stream.eval(local.update(1 :: _)).noInterruptScope
_ <- Stream.eval(local.update(2 :: _)) // this one will be lost on a forked fiber
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know whether this is a good idea or not, but it makes the test pass.

Suggested change
_ <- Stream.eval(local.update(2 :: _)) // this one will be lost on a forked fiber
_ <- Stream.eval(local.update(2 :: _)).interruptScope // this one will be lost on a forked fiber

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that works. Going to your example in armanbilge/bayou#1 (comment) it would be like doing this I think:

- x  <- (spanS[IO]("stream") >> s2).interruptWhen(d).compile.foldMonoid
+ x  <- (spanS[IO]("stream") >> s2.interruptScope).interruptWhen(d).compile.foldMonoid

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, actually I'm not sure. Because even if s2 is interruptible I don't think it would respect the interruptibility of the outer Stream since it doesn't know about that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got called away to a meeting before I could share and again before I can refine, but I'm trying to define a spanS. This passes. (Both interrupt scopes aren't necessary, but the one in spanS is necessary when it doesn't know what it's spanning.)

  test("spanS -- interrupted") {
    IOLocal(0).flatMap { local =>
      def spanS: Stream[IO, Unit] =
        Stream.bracket(local.set(1))(_ => local.set(0))
          .noInterruptScope
          .interruptScope

      ((spanS >> Stream.eval(local.get)) ++ Stream.eval(local.get))
        .interruptScope.compile.toList
    }.assertEquals(List(1, 0))
  }

Copy link
Member

@rossabaker rossabaker Mar 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand your comment on the outer stream. We don't want to go noInterrupt-interrupt. We want to go noInterrupt-reset. My spanS above would make an uninterruptible-by-inheritance stream interruptible to trace it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, actually it doesn't matter at all whether the Stream we want to trace is interruptible or not. Really, we shouldn't be messing with that. In fact, interruptibility is sort of convoluted with the real problem here.

All we want is a way to call local.set(...) on the fiber will parent every other forked fiber in that Stream. (And ideally a way to do it again, at the end of the Stream when we close the Span). Because interruptibility is using race is why it becomes important in this context, but these are brittle sort of implementation details. And there could be other similar gotchas throughout fs2.

Since IOLocal is specific to IO, I wonder if we just need to add some IO specific methods to Stream specifically for working with locals. Because currently compositionality is kind of broken for anything to do with IOLocal.

val s = Stream.eval(local.set(...)) >> Stream.eval(local.get)
s.compile.lastOrError =!= s.interruptScope.compile.lastOrError

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lazy question: does fs2 even depend on IO? Wouldn't it need to put some sort of hint into the algebra that an IO interpreter was aware of?

Copy link
Member Author

@armanbilge armanbilge Mar 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fs2 depends on CE core because of SyncIO but there's no direct dependency to IO that I'm aware of anyway.

I don't think that the interpreter needs to be aware of IO. Roughly I was thinking we could have some (ugly) method like this:

object Stream {
  // edit: this signature needs to be re-thought, for sure ...
  def locally[A, B](local: IOLocal[A])(IOLocal[A] => IO[B]): Stream[IO, B]
}

that must satisfy something like

val s = Stream.locally(local)(_.set(...)) >> Stream.locally(local)(_.get)
s.compile.lastOrError === s.interruptScope.compile.lastOrError

The implementation can delegate to something like Stream.evalUninterruptible which can be a private[fs2] method. So that way we don't expose it, but also the algebra doesn't need to know about IO. The fact that the desired semantics are achieved by disabling interruption is remains an implementation detail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds reasonable, but it also means we can't derive spanS from the proposed TraceResource and would need a third type class:

classDiagram
    Trace <|-- TraceResource
    TraceResource <|-- TraceStream
      class Trace{
        +span(String name)(F~A~ fa) F~A~
      }
      class TraceResource{
        +spanR(String name) Resource~F~
      }
      class TraceStream{
        +spanS(String name) Stream~F~
      }
Loading

Sorry, I don't know enough Mermaid UML™ to render the output type of Resource and Stream, but you get the gist.

Or maybe TraceResource could have an abstract type parameter, and fs2 could have a Locally type class that relates IO and IOLocal, but now I'm feeling even dizzier.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is what I was getting at in armanbilge/bayou#1 (comment) and typelevel/natchez#526 (comment): we will need a spanS method implemented specifically for Stream and thus would need to take on an fs2 dependency. This would be true even with my original noInterruptScope idea.

I don't know if TraceResource and TraceStream or more typeclasses are the right answer 😅 starts to get complicated. I'd rather have an enhanced Trace and take on the fs2 dependency which is kind of what Bayou purported anyway.

_ <- Stream.eval(local.update(3 :: _)).noInterruptScope
result <- Stream.eval(local.get)
} yield result
s.interruptScope.compile.lastOrError.assertEquals(List(3, 1))
}
Comment on lines +373 to +382
Copy link
Member Author

@armanbilge armanbilge Mar 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I was concerned about—that the noInterruptScope would leak to other portions of the Stream. This test currently fails, because Stream.eval(local.update(2 :: _)) is not interruptible even though it should be.

}