Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add MulticastProcessor #6002

Merged
merged 3 commits into from
May 17, 2018
Merged

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented May 9, 2018

This PR adds the MulticastProcessor from the extensions project to be a standard processor option.

This type of processor fills the gap of having a backpressure-coordinating processor type as PublishProcessor doesn't coordinate backpressure on its own and Flowable.publish() often can't be used because the upstream may not yet exist when the dowstream consumers are setup.

MulticastProcessor

Example:

MulticastProcessor<Integer> mp = Flowable.range(1, 10)
    .subscribeWith(MulticastProcessor.create());

mp.test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// --------------------

MulticastProcessor<Integer> mp2 = MulticastProcessor.create(4);
mp2.start();

assertTrue(mp2.offer(1));
assertTrue(mp2.offer(2));
assertTrue(mp2.offer(3));
assertTrue(mp2.offer(4));

assertFalse(mp2.offer(5));

mp2.onComplete();

mp2.test().assertResult(1, 2, 3, 4);

Resolves: #5999

@akarnokd akarnokd added this to the 2.2 milestone May 9, 2018
@codecov
Copy link

codecov bot commented May 9, 2018

Codecov Report

Merging #6002 into 2.x will increase coverage by 0.01%.
The diff coverage is 98.67%.

Impacted file tree graph

@@             Coverage Diff              @@
##                2.x    #6002      +/-   ##
============================================
+ Coverage     98.25%   98.27%   +0.01%     
- Complexity     6062     6158      +96     
============================================
  Files           656      659       +3     
  Lines         44101    44514     +413     
  Branches       6118     6201      +83     
============================================
+ Hits          43333    43745     +412     
- Misses          230      231       +1     
  Partials        538      538
Impacted Files Coverage Δ Complexity Δ
...va/io/reactivex/processors/MulticastProcessor.java 98.67% <98.67%> (ø) 84 <84> (?)
...l/operators/observable/ObservableFlatMapMaybe.java 85.62% <0%> (-4.58%) 2% <0%> (ø)
...tivex/internal/observers/FutureSingleObserver.java 94.33% <0%> (-3.78%) 24% <0%> (-1%)
.../operators/observable/ObservableFlatMapSingle.java 88.8% <0%> (-3.74%) 2% <0%> (ø)
...rnal/operators/flowable/FlowableSkipLastTimed.java 95.91% <0%> (-2.05%) 2% <0%> (ø)
...nternal/operators/parallel/ParallelReduceFull.java 93.06% <0%> (-1.99%) 2% <0%> (ø)
...perators/single/SingleFlatMapIterableFlowable.java 96.66% <0%> (-1.67%) 2% <0%> (ø)
...rnal/subscriptions/DeferredScalarSubscription.java 98.46% <0%> (-1.54%) 28% <0%> (-1%)
...a/io/reactivex/internal/util/QueueDrainHelper.java 98.61% <0%> (-1.39%) 57% <0%> (-1%)
.../operators/mixed/FlowableSwitchMapCompletable.java 98.94% <0%> (-1.06%) 2% <0%> (ø)
... and 24 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update cedfc53...8f484aa. Read the comment docs.

Copy link
Collaborator

@vanniktech vanniktech left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somehow I missed this PR

* upstream when all {@link Subscriber}s have cancelled. Late {@code Subscriber}s will then be
* immediately completed.
* <p>
* Because of {@code MulticastProcessor} implements the {@link Subscriber} interface, calling
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd nuke the of here

* {@code onSubscribe} is mandatory (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>).
* If {@code MulticastProcessor} shoud run standalone, i.e., without subscribing the {@code MulticastProcessor} to another {@link Publisher},
* use {@link #start()} or {@link #startUnbounded()} methods to initialize the internal buffer.
* Failing to do so will lead to {@link NullPointerException} at runtime.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lead to a

public static <T> MulticastProcessor<T> create() {
return new MulticastProcessor<T>(bufferSize(), false);
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double new line

* @param <T> the input and output value type
* @return the new MulticastProcessor instance
*/
public static <T> MulticastProcessor<T> create(int bufferSize) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add the @CheckReturnValue annotation here

* is cancelled
* @return the new MulticastProcessor instance
*/
public static <T> MulticastProcessor<T> create(boolean refCount) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add the @CheckReturnValue annotation here

* @param <T> the input and output value type
* @return the new MulticastProcessor instance
*/
public static <T> MulticastProcessor<T> create(int bufferSize, boolean refCount) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add the @CheckReturnValue annotation here

}

/**
* Constructs a fresh instance with the given prefetch amount and the optional
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no real need of having the documentation here, is there? It's not a public method and the create method already cover everything

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may help those who dig into the source code for some reason (learning, debugging, etc.). I'll leave it there.

@Override
public void onError(Throwable t) {
if (t == null) {
throw new NullPointerException("t is null");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we also have another second sentence that was stating that nulls are not allowed per definition?


mp.test().assertResult();
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double new line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, removed.

.assertEmpty()
.requestMore(1)
.assertValues(0)
.assertNotComplete()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you just use assertValuesOnly and then save the assertNotComplete call?

@akarnokd akarnokd merged commit f87879d into ReactiveX:2.x May 17, 2018
@akarnokd akarnokd deleted the MulticastProcessor branch May 17, 2018 21:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants