-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
1.x: fix counted buffer and window backpressure #3678
Merged
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This was referenced Feb 9, 2016
This was referenced Mar 14, 2016
return; | ||
} | ||
|
||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accidental?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't remember what I wanted to say here. Doesn't really matter.
akarnokd
force-pushed
the
BufferWindowRequestFix1x
branch
from
March 17, 2016 21:33
e9c6d49
to
daa8d2e
Compare
Fixed all 3 sites of |
akarnokd
force-pushed
the
BufferWindowRequestFix1x
branch
from
March 17, 2016 22:19
daa8d2e
to
be8d144
Compare
👍 |
1 similar comment
👍 |
akarnokd
added a commit
that referenced
this pull request
Mar 18, 2016
1.x: fix counted buffer and window backpressure
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR fixes the backpressure behavior of the counted
buffer
andwindow
operators and consists of several changes.The main issue lies when
count > skip
in the operators, yielding overlapping buffers/windows.For
buffer
, when the upstream completed, the logic emitted all remaining partial buffers even if there was no request for new buffers, which can result inMissingBackpressureException
somewhere. The proper handling of the final buffers required a new backpressure management algorithm which is now part of theBackpressureUtils
class and consists of two new methods:postCompleteDone
called from onComplete to take over the emission of queued values andpostCompleteRequest
which manages requests before and after the completed state.For
window
, the new window opened was emitted regardless of requests which was common due to request-amplification (i.e., requesting n windows results in requestingcount + skip * (n - 1)
elements at first (thenskip * n
later) which opensceil(count / skip)
windows upfront. To avoid the overflow, the individual windows have to go through the usual queue/drain logic as well. I've also updated the Javadoc to reflect the backpressure behavior along with parameter validation.In addition, the window case didn't manage cancellation properly. When the outer observable is unsubscribed, the inner subscribers may be still going and thus cancelling the upstream would stop/hang the inner windows. Instead, the open window count is tracked (also counting the outer as 1 window) and when all get unsubscribed (i.e., count reaches zero), the upstream is unsubscribed. To accomplish this, the
UnicastSubject
had to be retrofitted with a new optional callbackAction0
which gets called at most once whenever eitheronError
oronCompleted
is called or when the singleSubscriber
unsubscribes.A secondary issue was with the
TestSubscriber
's initial request; some upstream operators could get triggered withLong.MAX_VALUE
despite the initial request amount was set. This PR changes it to be set at construction time instead of inonStart
.