Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Subject NPE fixes, add UnicastProcessor TCK #5760

Merged
merged 4 commits into from
Dec 15, 2017

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Dec 11, 2017

This PR adds the Subject.refCount() and FlowableProcessor.refCount() that capture the upstream's Disposable/Subscription and disposes/cancels them if the number of Observers/Subscribers decreases to zero. The Reactive-Streams TCK and thus other implementations may expect such behavior from a Processor implementation and this wrapper is required to pass the TCK tests.

While implementing the TCK tests, it turned out the null-handling of the Subjects and FlowableProcessors were not following the Reactive-Streams specification. They have to throw a NullPointerException immediately and not turn them into NPEs for the downstream. These classes and the tests have been fixed as well. Their error messages have been uniformed too.

@akarnokd akarnokd added this to the 2.2 milestone Dec 11, 2017
@codecov
Copy link

codecov bot commented Dec 11, 2017

Codecov Report

Merging #5760 into 2.x will decrease coverage by 0.04%.
The diff coverage is 100%.

Impacted file tree graph

@@             Coverage Diff             @@
##                2.x   #5760      +/-   ##
===========================================
- Coverage     96.24%   96.2%   -0.05%     
+ Complexity     5838    5808      -30     
===========================================
  Files           634     634              
  Lines         41653   41607      -46     
  Branches       5769    5770       +1     
===========================================
- Hits          40090   40028      -62     
- Misses          624     636      +12     
- Partials        939     943       +4
Impacted Files Coverage Δ Complexity Δ
...ava/io/reactivex/processors/FlowableProcessor.java 100% <ø> (ø) 3 <0> (ø) ⬇️
...ain/java/io/reactivex/subjects/PublishSubject.java 97.8% <100%> (-2.2%) 38 <6> (-3)
...in/java/io/reactivex/subjects/BehaviorSubject.java 86.24% <100%> (-0.74%) 54 <6> (-3)
.../main/java/io/reactivex/subjects/MaybeSubject.java 100% <100%> (ø) 46 <6> (-2) ⬇️
...java/io/reactivex/subjects/CompletableSubject.java 100% <100%> (ø) 36 <3> (-1) ⬇️
...java/io/reactivex/processors/UnicastProcessor.java 95.83% <100%> (-0.08%) 62 <5> (-2)
...n/java/io/reactivex/processors/AsyncProcessor.java 99.08% <100%> (-0.07%) 52 <5> (-4)
...main/java/io/reactivex/subjects/ReplaySubject.java 97.78% <100%> (+0.2%) 49 <6> (-2) ⬇️
...ava/io/reactivex/processors/BehaviorProcessor.java 89.23% <100%> (-6.79%) 59 <6> (-3)
...ain/java/io/reactivex/subjects/UnicastSubject.java 98.1% <100%> (-0.66%) 61 <6> (-4)
... and 39 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update b338ffe...e80ab32. Read the comment docs.

@@ -175,10 +175,8 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😰😰😰 well this is a little bit breaking change, but since it was always part of RS spec I guess it falls into patch category for RxJava

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You were not supposed to call onNext with null anyway and there is no reason to take this extra long path to get the end subscriber a NullPointerException via onError.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep I understand :)

Just noted that it's a little bit breaking behavior for existing users, but since RS doesn't allow nulls anyway we can make this change as "patch" rather than wait for 3.x

@artem-zinnatullin
Copy link
Contributor

I like RS spec-related fixes 👍

But not sure about motivation for having refCount() specific for processors/subjects, do you have some real-world use-case scenario on mind?

@akarnokd
Copy link
Member Author

Other than such the behavior is needed to pass the TCK, none that I can think of. The original idea behind Processor is that it acts as a transformation step and would service at most 1 Subscriber by that. If that solo Subscriber cancels, it is reasonable that cancellation travels upstream. For us, we don't use Processors but basically chains of Subscriber -> Subscriber -> Subscriber where the mentioned cancellation is wired up nonetheless.

I can move the operator into the test section and not extend the Subject/FlowableProcessor classes.

@artem-zinnatullin
Copy link
Contributor

I can move the operator into the test section and not extend the Subject/FlowableProcessor classes

That sounds much better to me tbh, I just don't want us to decline PRs with new operators from users and suggest them contribute to rxjava-extras/etc and at the same time add new ones that no user really asked for

Nothing personal :)

@akarnokd
Copy link
Member Author

Okay, I'll have a copy of the processor version in test and move both into RxJava2Extensions.

@artem-zinnatullin
Copy link
Contributor

Thanks, David! Appreciate that

@akarnokd akarnokd changed the title 2.x: add Subject/Processor refCount(), Subject NPE fixes 2.x: Subject NPE fixes, add UnicastProcessor TCK Dec 15, 2017
@akarnokd
Copy link
Member Author

Done.

Copy link
Contributor

@artem-zinnatullin artem-zinnatullin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just few small comments, otherwise lgtm 👍

.assertNoValues()
.assertError(NullPointerException.class)
.assertErrorMessage("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
p.test().assertEmpty().cancel();;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit ;; (and in other places in this PR)

* @param <T> the upstream and downstream value type
* @since 2.1.8 - experimental
*/
@Experimental
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove?

@artem-zinnatullin
Copy link
Contributor

// Now started watching rxjava-extras, feel free to ping me if you need review on a PR there

@akarnokd akarnokd merged commit 7ba9a3e into ReactiveX:2.x Dec 15, 2017
@akarnokd akarnokd deleted the SubjectRefCount branch December 15, 2017 10:13
@akarnokd
Copy link
Member Author

I put my excess things into RxJava2Extensions btw. rxjava2-extras is @davidmoten 's repository.

@artem-zinnatullin
Copy link
Contributor

yep, I've mixed up naming but subscribed to yours. Somehow I've never needed to use neither of extensions/extras for all these years 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants