-
Notifications
You must be signed in to change notification settings - Fork 605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce Stream#noInterruptScope
#2843
Closed
Closed
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
56582bf
Add test for https://github.com/typelevel/fs2/issues/2842
armanbilge 5ae60b9
Add Interruption ADT
armanbilge d68dce6
Pull.Interruption -> Scope.Interrupt
armanbilge deed913
Use new Scope.Interrupt ADT
armanbilge 5fac112
Add support for Scope.Interrupt.Disabled
armanbilge 6054e5d
Add Stream#noInterruptScope
armanbilge 814fd3a
It works :)
armanbilge 7e9081f
Add MiMa filters
armanbilge 379ca42
Update implementation notes
armanbilge ce014f6
Make the test case more interesting
armanbilge File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,7 +23,7 @@ package fs2 | |
|
||
import scala.concurrent.duration._ | ||
|
||
import cats.effect.{IO, Sync} | ||
import cats.effect.{IO, IOLocal, Sync} | ||
import cats.effect.kernel.Deferred | ||
import cats.effect.std.Semaphore | ||
import org.scalacheck.effect.PropF.forAllF | ||
|
@@ -369,4 +369,15 @@ class StreamInterruptSuite extends Fs2Suite { | |
val interrupt = IO.sleep(250.millis) | ||
IO.race(compileWithSync(s).drain, interrupt).map(_.isRight).assert | ||
} | ||
|
||
test("issue #2842 - no interrupt scope") { | ||
val s = for { | ||
local <- Stream.eval(IOLocal(List.empty[Int])) | ||
_ <- Stream.eval(local.update(1 :: _)).noInterruptScope | ||
_ <- Stream.eval(local.update(2 :: _)) // this one will be lost on a forked fiber | ||
_ <- Stream.eval(local.update(3 :: _)).noInterruptScope | ||
result <- Stream.eval(local.get) | ||
} yield result | ||
s.interruptScope.compile.lastOrError.assertEquals(List(3, 1)) | ||
} | ||
Comment on lines
+373
to
+382
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is what I was concerned about—that the |
||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know whether this is a good idea or not, but it makes the test pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that works. Going to your example in armanbilge/bayou#1 (comment) it would be like doing this I think:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, actually I'm not sure. Because even if s2 is interruptible I don't think it would respect the interruptibility of the outer
Stream
since it doesn't know about that.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got called away to a meeting before I could share and again before I can refine, but I'm trying to define a
spanS
. This passes. (Both interrupt scopes aren't necessary, but the one in spanS is necessary when it doesn't know what it's spanning.)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand your comment on the outer stream. We don't want to go noInterrupt-interrupt. We want to go noInterrupt-reset. My
spanS
above would make an uninterruptible-by-inheritance stream interruptible to trace it.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, actually it doesn't matter at all whether the
Stream
we want to trace is interruptible or not. Really, we shouldn't be messing with that. In fact, interruptibility is sort of convoluted with the real problem here.All we want is a way to call
local.set(...)
on the fiber will parent every other forked fiber in thatStream
. (And ideally a way to do it again, at the end of theStream
when we close theSpan
). Because interruptibility is usingrace
is why it becomes important in this context, but these are brittle sort of implementation details. And there could be other similar gotchas throughout fs2.Since
IOLocal
is specific toIO
, I wonder if we just need to add someIO
specific methods toStream
specifically for working with locals. Because currently compositionality is kind of broken for anything to do withIOLocal
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lazy question: does fs2 even depend on
IO
? Wouldn't it need to put some sort of hint into the algebra that anIO
interpreter was aware of?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fs2 depends on CE core because of
SyncIO
but there's no direct dependency toIO
that I'm aware of anyway.I don't think that the interpreter needs to be aware of
IO
. Roughly I was thinking we could have some (ugly) method like this:that must satisfy something like
The implementation can delegate to something like
Stream.evalUninterruptible
which can be aprivate[fs2]
method. So that way we don't expose it, but also the algebra doesn't need to know aboutIO
. The fact that the desired semantics are achieved by disabling interruption is remains an implementation detail.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds reasonable, but it also means we can't derive
spanS
from the proposedTraceResource
and would need a third type class:Sorry, I don't know enough Mermaid UML™ to render the output type of
Resource
andStream
, but you get the gist.Or maybe
TraceResource
could have an abstract type parameter, and fs2 could have aLocally
type class that relatesIO
andIOLocal
, but now I'm feeling even dizzier.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This is what I was getting at in armanbilge/bayou#1 (comment) and typelevel/natchez#526 (comment): we will need a
spanS
method implemented specifically forStream
and thus would need to take on anfs2
dependency. This would be true even with my originalnoInterruptScope
idea.I don't know if
TraceResource
andTraceStream
or more typeclasses are the right answer 😅 starts to get complicated. I'd rather have an enhancedTrace
and take on the fs2 dependency which is kind of what Bayou purported anyway.