Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: PublishSubject fail-fast when backpressured #4225

Merged
merged 2 commits into from
Jul 27, 2016

Conversation

akarnokd
Copy link
Member

This PR modifies the PublishSubject to fail fast if the child Subscriber can't keep up. Therefore, instead of some other operator failing somewhere downstream, the MissingBackpressureException now points to the PublishSubject instead.

In addition, there were complaints in #3850 that cross-unsubscription doesn't stop another Subscriber from receiving events if it comes after the unsubscribe() call in the dispatch loop. Since PublishSubject now tracks request - which is the main extra overhead - it is trivial to add the necessary eager check for the unsubscribed state.

Benchmark comparison (i7 4790, Windows 7 x64, Java 8u102)

image

As expected, this adds some overhead although most noticeably for the mid-range only. Short-lived publishing is now slightly faster even.

Interestingly, many benchmarks behave oddly in these 1000s range - we could be hitting some JIT threshold. While in other benchmarks, the warmup iteration numbers keep increasing as JIT does its work but here, it starts out quite nicely then drops 25% and stays that way. I'm on windows so JMH -perfasm doesn't work.

If this direction is accepted, I'll update BehaviorSubject, timer() and interval() do do the same tracking.

@akarnokd akarnokd added this to the 1.2 milestone Jul 22, 2016
@codecov-io
Copy link

codecov-io commented Jul 22, 2016

Current coverage is 84.12% (diff: 100%)

Merging #4225 into 1.x will decrease coverage by <.01%

@@                1.x      #4225   diff @@
==========================================
  Files           265        265          
  Lines         17305      17377    +72   
  Methods           0          0          
  Messages          0          0          
  Branches       2624       2643    +19   
==========================================
+ Hits          14559      14619    +60   
+ Misses         1893       1891     -2   
- Partials        853        867    +14   

Powered by Codecov. Last update 479df31...b094b0b

pp.onError(e);
} catch (Throwable ex) {
if (errors == null) {
errors = new ArrayList<Throwable>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: (1)

@artem-zinnatullin
Copy link
Contributor

👍

// I'm afraid that this will break some amount of user code, but this has to be done…

@akarnokd
Copy link
Member Author

// I'm afraid that this will break some amount of user code, but this has to be done…

To paraphrase Godzilla (2014): Let them fail (early)! :)

@stevegury
Copy link
Member

👍

@akarnokd akarnokd merged commit b72beff into ReactiveX:1.x Jul 27, 2016
@akarnokd akarnokd deleted the PublishSubjectBackpressureFailFast branch July 27, 2016 21:08
@felipecsl
Copy link

The recent changes to ReplaySubjectafter v1.1.5 broke my unit tests.
Not sure if caused by this PR (#4225), #4023 or #3918. Probably #3918 since it started after I upgraded to 1.1.6.
In my specific case, I was reusing the same TestSubscriber after unsubscribing and then subscribing again. In this case, I'm no longer getting events on the TestSubscriber after resubscribing it. This is roughly what I'm doing:

ReplaySubject<String> subject = ReplaySubject.create();
TestSubscriber subscriber = new TestSubscriber();
Subscription subscription = subject.subscribe(subscriber);
subscription.unsubscribe();
subject.onNext("foo");
subject.onCompleted();
subject.subscribe(subscriber);
// Assertions below pass with v1.1.5 but fail with 1.1.6 and above
subscriber.assertCompleted();
subscriber.assertValue("foo");

Questions:

  1. Is this the expected behavior or a bug?
  2. If expected, is there any way I can reuse the same subscriber again after it has been unsubscribed?

Thanks!

@felipecsl
Copy link

Alright apparently using TestObserver instead of TestSubscriber fixes the problem, although that one is deprecated. Looks like you're not supposed to reuse a TestSubscriber?
I ended up rolling my own TestObserver that has the same convenience methods as TestSubscriber, like assertNoErrors(), assertCompleted(), etc.
It seems to me that TestObserver should not be deprecated because it seems useful in such cases. What do you think?

felipecsl added a commit to airbnb/RxGroups that referenced this pull request Aug 28, 2016
Test changes were required since RxJava 1.1.6 changed the way
ReplaySubject in a way that broke reusing the same `TestSubscriber` more
than once. (See comments here:
ReactiveX/RxJava#4225 (comment))
Fixed tests by using a `TestObserver` that can be reused multiple times.
@akarnokd
Copy link
Member Author

akarnokd commented Aug 28, 2016

You are not supposed to reuse Subscriber ever. Subscribe with a new TestSubscriber and assert on that.

felipecsl added a commit to airbnb/RxGroups that referenced this pull request Aug 28, 2016
* Bump buildTools, sdkVersion and dependencies

Test changes were required since RxJava 1.1.6 changed the way
ReplaySubject in a way that broke reusing the same `TestSubscriber` more
than once. (See comments here:
ReactiveX/RxJava#4225 (comment))
Fixed tests by using a `TestObserver` that can be reused multiple times.
@mikehearn
Copy link

My reading of this change is that it's no longer possible to emit on a PublishSubject that has no observers (yet). If so, isn't that a rather large behaviour/API break - a lot more than just failing early?

This change broke our tests, but I don't see any discussion of this possibility above. Was it considered? I don't see anything in the javadocs that suggests that emitting on an unsubscribed subject is a problem.

@akarnokd
Copy link
Member Author

akarnokd commented Jan 4, 2017

My reading of this change is that it's no longer possible to emit on a PublishSubject that has no observers (yet)

PublishSubject considers each of the subscribed Subscribers individually and sending an onError if that particular Subscriber is not ready to receive onNext events. This doesn't prevent you from emitting onNext events when there is no Subscriber as these events are simply dropped by PublishSubject since day 1.

This change broke our tests, but I don't see any discussion of this possibility above. Was it considered? I don't see anything in the javadocs that suggests that emitting on an unsubscribed subject is a problem.

Without seeing what your unit tests did it's hard to say what's wrong with them.

@mikehearn
Copy link

Indeed, my mistake, sorry, I got confused reading the diff and thought it was going straight from the outer onNext to the inner producer class onNext, d'oh. I guess something subscribed has missing backpressure somewhere. It's hard to tell from the error where the root cause is though, as the exception is thrown by the PublishSubject and it doesn't say much about which subscriber had problems.

@akarnokd
Copy link
Member Author

akarnokd commented Jan 4, 2017

Enable assembly tracking with RxJavaHooks.enableAssemblyTracking() around a unit test that fails and you'll get more detailed exceptions about what failed (most likely). But first make sure you don't call onNext concurrently on the PublishSubject.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants