-
Notifications
You must be signed in to change notification settings - Fork 607
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixing interruption behaviour #3183
Fixing interruption behaviour #3183
Conversation
* adding error & interruption propagation and integrity test
How does this PR relate to #3186? Should this be reviewed first? |
I ended up making all the changes in #3186, since I wanted to verify the correctness of that implementation, but I'm happy for this one to be considered first. I'll sync up the branch, run |
|
||
val downstream = source.groupWithin(100, 2.seconds) | ||
|
||
downstream.intercept[SevenNotAllowed.type] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make an assertion here about what the downstream has / has not received before the error?
.timeout(downstreamTimeout) | ||
.flatTap(_ => IO.monotonic.flatMap(ref.set)) | ||
.flatMap(emit => ref.get.map(timeLapsed => (timeLapsed, emit))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.timeout(downstreamTimeout) | |
.flatTap(_ => IO.monotonic.flatMap(ref.set)) | |
.flatMap(emit => ref.get.map(timeLapsed => (timeLapsed, emit))) | |
.timed |
Co-authored-by: Arman Bilge <armanbilge@gmail.com>
@@ -874,6 +874,7 @@ class StreamCombinatorsSuite extends Fs2Suite { | |||
source.groupWithin(Int.MaxValue, 1.day) | |||
|
|||
downstream.compile.lastOrError | |||
.timeout(downstreamTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this timeout necessary? Since its an executeEmbed
test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's because if the test fails we get a slightly better error message
java.util.concurrent.TimeoutException: 7500 milliseconds
which can be easily associated to downstreamTimeout
otherwise we get this value on the diff which looks a bit random
_1 = 86405500000000 nanoseconds,
but I'm happy to remove it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, that is a nicer error :) thanks!
def endSupply(result: Either[Throwable, Unit]): F2[Unit] = | ||
buffer.update(_.copy(endOfSupply = Some(result))) *> supply.releaseN(Int.MaxValue) | ||
buffer.update(_.copy(endOfSupply = Some(result))) *> supply.releaseN( | ||
Int.MaxValue + outputLong | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, dumb question: why is Int.MaxValue
a "magic number" in this context? I would have thought it's effectively maxing out the semaphore, but if it needs + outputLong
to work then I feel like it must have more significance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Legit question to be fair. Had to think about it again.
Interruption of the upstream fiber (i.e. Outcome.Cancelled) is handled downstream by doing nothing (permits are never released)
So by increasing the supply to Int.MaxValue
we are just evening out the negative balance (Int.MaxValue
is to account for the worst case scenario: at most the chunkSize
parameter will be equal to Int.MaxValue
)
val waitSupply = supply.acquireN(outputLong).guaranteeCase {
case Outcome.Succeeded(_) => supply.releaseN(outputLong)
case _ => F.unit
}
Now after getting past the "checkpoint" above we are acquiring outputLong
permits again
acq <- F.race(F.sleep(timeout), waitSupply).flatMap {
case Left(_) => onTimeout
case Right(_) => supply.acquireN(outputLong).as(outputLong)
}
So in order to get past this point we need to release an additional outputLong
permits and that allows the stream to be unblocked
EDIT
Interruption of the upstream fiber (i.e. Outcome.Cancelled)
uhm well actually I've just tested it, it is not handled with Outcome.Cancelled
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for that explanation!
Int.MaxValue
is to account for the worst case scenario: at most thechunkSize
parameter will be equal toInt.MaxValue
So could we just use chunkSize
here, instead of Int.MaxValue
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@armanbilge apologies I was wrong, that's not what's happening here. I'm just doing some tests to figure out why we need the additional outputLong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, if these implementations details are no longer relevant after your rewrite in the other PR, then let's not get too hung up on this one :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok I think I've figured it out (might be useful for the other implementation actually)
basically the problem is that we need enough supply to cover 2 iterations of the race loop. So if we only increase it by Int.MaxValue
the following will happen
- (current iteration): supply is unblocked
- (next iteration): supply gets stuck (not enough supply because upstream was interrupted)
if instead we increase it by Int.MaxValue + outputLong
- (current iteration): supply is unblocked
- (next iteration): supply is not blocked thanks to the additional
outputLong
So since the chunkSize can be as high as Int.MaxValue
then the minimum supply to unblock the semaphore should be Int.MaxValue + outputLong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So since the chunkSize can be as high as
Int.MaxValue
then the minimum supply to unblock the semaphore should beInt.MaxValue + outputLong
Key word being "can". Wouldn't chunkSize + outputLong
be sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah that should work. The test still passes, I'll change it to outputLong * 2
since chunkSize == outputLong
Have you stress the latency of many fs streams with groupWithin? |
Hey @He-Pin , may I ask ? what do you mean exactly? Running many streams concurrently and using |
Eg, using groupwithin to build a lock-step game server where the latency matters |
Thanks for the clarification. I must admit that unfortunately I've never heard of a lock-step server, so I haven't done any work (or rather wrote any test) with this particular use case in mind. Also keep in mind that I'm fairly new to open source contributions so maybe this is more of a question for regular maintainers. Nonetheless, I'd be interested to find out more: can you point me towards an example or a beginner friendly resource? Thank you 🙏🏾 |
@He-Pin you are welcome to contribute some benchmarks for your usecase, that will probably be the best way to answer your question :) |
keep performance in mind, this is the way,mandalorian. |
that's true, for my usecase i need to use a Hashed timer instead. I'm learning more CE code too, will pr later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for chasing this one down! The new tests specifying the expected behavior are great 👍
Summary
Changes
increasing supply by
Int.MaxValue + chunkSize
on upstreamfinalization
to prevent the supply semaphore from blocking when upstream is interrupted while downstream is waiting for the timeout to expire or to have enough elementsNotes
stress test: all elements are processed
test is nothing but a copy of the benchmark with and integrity check at the end. While it may seem redundant (i.e.should never lose any elements
should cover it), in reality it is possible to write an implementation that passes that test but fails this one. (on a side note, that explains the big performance gain of the implementation linked earlier, sohaving this test in place could prevent being misled when looking at benchmark results, while working on a new implementation 🙏🏾 )