Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: fix counted buffer and window backpressure #3678

Merged
merged 1 commit into from
Mar 18, 2016

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Feb 8, 2016

This PR fixes the backpressure behavior of the counted buffer and window operators and consists of several changes.

The main issue lies when count > skip in the operators, yielding overlapping buffers/windows.

For buffer, when the upstream completed, the logic emitted all remaining partial buffers even if there was no request for new buffers, which can result in MissingBackpressureException somewhere. The proper handling of the final buffers required a new backpressure management algorithm which is now part of the BackpressureUtils class and consists of two new methods: postCompleteDone called from onComplete to take over the emission of queued values and postCompleteRequest which manages requests before and after the completed state.

For window, the new window opened was emitted regardless of requests which was common due to request-amplification (i.e., requesting n windows results in requesting count + skip * (n - 1) elements at first (then skip * n later) which opens ceil(count / skip) windows upfront. To avoid the overflow, the individual windows have to go through the usual queue/drain logic as well. I've also updated the Javadoc to reflect the backpressure behavior along with parameter validation.

In addition, the window case didn't manage cancellation properly. When the outer observable is unsubscribed, the inner subscribers may be still going and thus cancelling the upstream would stop/hang the inner windows. Instead, the open window count is tracked (also counting the outer as 1 window) and when all get unsubscribed (i.e., count reaches zero), the upstream is unsubscribed. To accomplish this, the UnicastSubject had to be retrofitted with a new optional callback Action0 which gets called at most once whenever either onError or onCompleted is called or when the single Subscriber unsubscribes.

A secondary issue was with the TestSubscriber's initial request; some upstream operators could get triggered with Long.MAX_VALUE despite the initial request amount was set. This PR changes it to be set at construction time instead of in onStart.

@akarnokd
Copy link
Member Author

Ping @stevegury @stealthcode @zsxwing

@egor-n
Copy link

egor-n commented Mar 14, 2016

@iNoles I think that's GitHub's fault. You can see here that the line is fine.

return;
}

//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accidental?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember what I wanted to say here. Doesn't really matter.

@akarnokd
Copy link
Member Author

Fixed all 3 sites of new ArrayList

@stevegury
Copy link
Member

👍

1 similar comment
@ZacSweers
Copy link
Contributor

👍

akarnokd added a commit that referenced this pull request Mar 18, 2016
1.x: fix counted buffer and window backpressure
@akarnokd akarnokd merged commit fd2da39 into ReactiveX:1.x Mar 18, 2016
@akarnokd akarnokd deleted the BufferWindowRequestFix1x branch March 18, 2016 07:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants