-
Notifications
You must be signed in to change notification settings - Fork 605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce Stream#noInterruptScope
#2843
Conversation
Woops?
Edit: can't reproduce locally, a flake? 😓 Edit 2: right, it also happened in #2831 (comment). |
test("issue #2842 - no interrupt scope") { | ||
val s = for { | ||
local <- Stream.eval(IOLocal(List.empty[Int])) | ||
_ <- Stream.eval(local.update(1 :: _)).noInterruptScope | ||
_ <- Stream.eval(local.update(2 :: _)) // this one will be lost on a forked fiber | ||
_ <- Stream.eval(local.update(3 :: _)).noInterruptScope | ||
result <- Stream.eval(local.get) | ||
} yield result | ||
s.interruptScope.compile.lastOrError.assertEquals(List(3, 1)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what I was concerned about—that the noInterruptScope
would leak to other portions of the Stream
. This test currently fails, because Stream.eval(local.update(2 :: _))
is not interruptible even though it should be.
Ok, I'm afraid this particular approach may be a dead end. I didn't think hard enough about how scopes work and It suffers from exactly the problem @rossabaker asked about in armanbilge/bayou#1 (comment), which is that the no-interruptibility scope extends over the Of course, a |
val s = for { | ||
local <- Stream.eval(IOLocal(List.empty[Int])) | ||
_ <- Stream.eval(local.update(1 :: _)).noInterruptScope | ||
_ <- Stream.eval(local.update(2 :: _)) // this one will be lost on a forked fiber |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know whether this is a good idea or not, but it makes the test pass.
_ <- Stream.eval(local.update(2 :: _)) // this one will be lost on a forked fiber | |
_ <- Stream.eval(local.update(2 :: _)).interruptScope // this one will be lost on a forked fiber |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that works. Going to your example in armanbilge/bayou#1 (comment) it would be like doing this I think:
- x <- (spanS[IO]("stream") >> s2).interruptWhen(d).compile.foldMonoid
+ x <- (spanS[IO]("stream") >> s2.interruptScope).interruptWhen(d).compile.foldMonoid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, actually I'm not sure. Because even if s2 is interruptible I don't think it would respect the interruptibility of the outer Stream
since it doesn't know about that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got called away to a meeting before I could share and again before I can refine, but I'm trying to define a spanS
. This passes. (Both interrupt scopes aren't necessary, but the one in spanS is necessary when it doesn't know what it's spanning.)
test("spanS -- interrupted") {
IOLocal(0).flatMap { local =>
def spanS: Stream[IO, Unit] =
Stream.bracket(local.set(1))(_ => local.set(0))
.noInterruptScope
.interruptScope
((spanS >> Stream.eval(local.get)) ++ Stream.eval(local.get))
.interruptScope.compile.toList
}.assertEquals(List(1, 0))
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand your comment on the outer stream. We don't want to go noInterrupt-interrupt. We want to go noInterrupt-reset. My spanS
above would make an uninterruptible-by-inheritance stream interruptible to trace it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, actually it doesn't matter at all whether the Stream
we want to trace is interruptible or not. Really, we shouldn't be messing with that. In fact, interruptibility is sort of convoluted with the real problem here.
All we want is a way to call local.set(...)
on the fiber will parent every other forked fiber in that Stream
. (And ideally a way to do it again, at the end of the Stream
when we close the Span
). Because interruptibility is using race
is why it becomes important in this context, but these are brittle sort of implementation details. And there could be other similar gotchas throughout fs2.
Since IOLocal
is specific to IO
, I wonder if we just need to add some IO
specific methods to Stream
specifically for working with locals. Because currently compositionality is kind of broken for anything to do with IOLocal
.
val s = Stream.eval(local.set(...)) >> Stream.eval(local.get)
s.compile.lastOrError =!= s.interruptScope.compile.lastOrError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lazy question: does fs2 even depend on IO
? Wouldn't it need to put some sort of hint into the algebra that an IO
interpreter was aware of?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fs2 depends on CE core because of SyncIO
but there's no direct dependency to IO
that I'm aware of anyway.
I don't think that the interpreter needs to be aware of IO
. Roughly I was thinking we could have some (ugly) method like this:
object Stream {
// edit: this signature needs to be re-thought, for sure ...
def locally[A, B](local: IOLocal[A])(IOLocal[A] => IO[B]): Stream[IO, B]
}
that must satisfy something like
val s = Stream.locally(local)(_.set(...)) >> Stream.locally(local)(_.get)
s.compile.lastOrError === s.interruptScope.compile.lastOrError
The implementation can delegate to something like Stream.evalUninterruptible
which can be a private[fs2]
method. So that way we don't expose it, but also the algebra doesn't need to know about IO
. The fact that the desired semantics are achieved by disabling interruption is remains an implementation detail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds reasonable, but it also means we can't derive spanS
from the proposed TraceResource
and would need a third type class:
classDiagram
Trace <|-- TraceResource
TraceResource <|-- TraceStream
class Trace{
+span(String name)(F~A~ fa) F~A~
}
class TraceResource{
+spanR(String name) Resource~F~
}
class TraceStream{
+spanS(String name) Stream~F~
}
Sorry, I don't know enough Mermaid UML™ to render the output type of Resource
and Stream
, but you get the gist.
Or maybe TraceResource
could have an abstract type parameter, and fs2 could have a Locally
type class that relates IO
and IOLocal
, but now I'm feeling even dizzier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This is what I was getting at in armanbilge/bayou#1 (comment) and typelevel/natchez#526 (comment): we will need a spanS
method implemented specifically for Stream
and thus would need to take on an fs2
dependency. This would be true even with my original noInterruptScope
idea.
I don't know if TraceResource
and TraceStream
or more typeclasses are the right answer 😅 starts to get complicated. I'd rather have an enhanced Trace
and take on the fs2 dependency which is kind of what Bayou purported anyway.
I wonder whether something like this could help make this test pass, but I haven't made it click yet. I want the finalizer on Deferred[IO, Boolean].flatMap { d =>
val s = Stream.never[IO].onFinalize(d.complete(true).attempt.void)
IO.uncancelable { poll =>
IO.canceled *> poll(s.compile.drain)
}.onCancel(d.complete(false).attempt.void).start *> d.get
}.assert |
Ross found a better way to do tracing, which was the motivating use-case for this method. |
Opening for CI, let's see if I broke anything 😅