Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OnBackpressureXXX: support for common drain manager & fix for former concurrency bugs #1955

Merged
merged 7 commits into from
Feb 3, 2015

Conversation

akarnokd
Copy link
Member

This is quite a complex operator with lots of cases.

Properties:

  1. If there aren't any elements queued up and nothing is requested but terminal event received, emit terminal event and quit.
  2. If there are elements in the queue and a terminal flag, and at least the same amount is requested, deliver the events and the terminal event.
  3. If more was requested and more became available just after the loop and before the synchronized block, keep looping.
    3.a) If more was requested but nothing is available or nothing was requested and something is available: quit and let either the onNext or request do the subsequent drain.
    3.b) If elements and termination was produced but not requested: quit and let the request do the drain
    3.c) If termination was requested and no elements produced: loop , emit terminal event and quit.

In table form:

Available | Terminated | Requested | Action | Reason
   yes         yes          yes       loop    can deliver available
   yes         yes          false     quit    can't deliver available
   yes         no           yes       loop    can deliver available
   yes         no           no        quit    can't deliver available
   no          yes          yes       loop    loop will deliver termination only and quit
   no          yes          no        loop    loop will deliver termination only and quit
   no          no           yes       quit    nothing to deliver
   no          no           no        quit    nothing to deliver

@akarnokd
Copy link
Member Author

This is complicated stuff, maybe an (internal) helper class should be created to help with future producer-backpressure-consumer management.

reimplemented Buffer and Block strategies with it.
@akarnokd
Copy link
Member Author

I hope the new BackpressureDrainManager provides a common abstraction that let's build backpressure-providing operators that themselves do buffering of some sorts.

@benjchristensen
Copy link
Member

Wow, this is some non-trivial stuff. Thanks for tackling this. It's going to take me a bit to grok it.

@benjchristensen
Copy link
Member

Anyone else have the time and interest to also do a code review on the concurrency code in this? I'd appreciate more eyes than my own.


private AtomicBoolean saturated = new AtomicBoolean(false);
// if child unsubscribes it should unsubscribe the parent, but not the other way around
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this use case why would we ever be unsubscribing early? We can emit an onError, but I don't see the need to decouple the subscription.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertCapacity calls unsubscribe and would disrupt downstream.

@benjchristensen
Copy link
Member

I think this code is good to merge. I've asked a variety of clarifying questions that I want to review with you before I merge and release.

@@ -0,0 +1,241 @@
/*
* Copyright 2011 David Karnok
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong copyright, should be Netflix

@akarnokd akarnokd changed the title Fixed race & late termination condition. OnBackpressureXXX: support for common drain manager & fix for former concurrency bugs Jan 20, 2015
@akarnokd
Copy link
Member Author

Re-trigger travis.

}
@Override
public Object poll() {
return queue.poll();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add if (capacity != null) capacity.incrementAndGet(); here.

@zsxwing
Copy link
Member

zsxwing commented Jan 26, 2015

Could you change CRLFs to LFs in BackpressureDrainManager?

* @return true if a terminal state has been reached
*/
public final boolean isTerminated() {
return terminated;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isTerminated, terminate, terminate(Throwable) are not used. Could you explain why adding them?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really a complex class. So I prefer to keep it as simple as possible.

@akarnokd
Copy link
Member Author

Looks a.accept(o) will always return false. Right?

We can't know. The consumer way want to terminate after a certain item has been accepted.

isTerminated, terminate, terminate are not used. Could you explain why adding them?

This is a base class, some operators may need to do something after termination and before calling drain.

This is really a complex class. So I prefer to keep it as simple as possible.

This is an internal base class to help implement backpressure over an abstract queue. The unavoidable complexity is in drain().

@Override
public Object poll() {
Object value = queue.poll();
if (capacity != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry that I missed a case. If we poll null, we should not increment capacity.

@zsxwing
Copy link
Member

zsxwing commented Jan 27, 2015

LGTM now.

@akarnokd
Copy link
Member Author

@benjchristensen your review would be welcome.

akarnokd added a commit that referenced this pull request Feb 3, 2015
OnBackpressureXXX: support for common drain manager & fix for former concurrency bugs
@akarnokd akarnokd merged commit 2f06736 into ReactiveX:1.x Feb 3, 2015
@akarnokd akarnokd deleted the OnBackpressureBlockFix branch February 3, 2015 09:00
@benjchristensen benjchristensen mentioned this pull request Feb 3, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants