Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add efficient mergeWith(Single|Maybe|Completable) overloads. #5847

Merged
merged 3 commits into from
Feb 19, 2018

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Feb 9, 2018

This PR adds specialized overloads to the mergeWith operator in Flowable and Observable.

If accepted, the marbles will be updated in a separate PR.

Related: #5350.

@akarnokd akarnokd added this to the 2.2 milestone Feb 9, 2018
@codecov
Copy link

codecov bot commented Feb 9, 2018

Codecov Report

Merging #5847 into 2.x will increase coverage by 0.09%.
The diff coverage is 99.84%.

Impacted file tree graph

@@            Coverage Diff             @@
##               2.x   #5847      +/-   ##
==========================================
+ Coverage     96.4%   96.5%   +0.09%     
- Complexity    5834    5852      +18     
==========================================
  Files          640     646       +6     
  Lines        41944   42608     +664     
  Branches      5804    5906     +102     
==========================================
+ Hits         40438   41119     +681     
+ Misses         582     576       -6     
+ Partials       924     913      -11
Impacted Files Coverage Δ Complexity Δ
...nal/operators/flowable/FlowableMergeWithMaybe.java 100% <100%> (ø) 2 <2> (?)
src/main/java/io/reactivex/Flowable.java 100% <100%> (ø) 532 <3> (+3) ⬆️
src/main/java/io/reactivex/Observable.java 100% <100%> (ø) 515 <3> (+3) ⬆️
...al/operators/flowable/FlowableMergeWithSingle.java 100% <100%> (ø) 2 <2> (?)
...perators/observable/ObservableMergeWithSingle.java 100% <100%> (ø) 2 <2> (?)
...ors/observable/ObservableMergeWithCompletable.java 100% <100%> (ø) 2 <2> (?)
...erators/flowable/FlowableMergeWithCompletable.java 100% <100%> (ø) 2 <2> (?)
...operators/observable/ObservableMergeWithMaybe.java 99.1% <99.1%> (ø) 2 <2> (?)
.../operators/completable/CompletableConcatArray.java 93.75% <0%> (-6.25%) 2% <0%> (ø)
.../operators/flowable/FlowableBlockingSubscribe.java 91.89% <0%> (-5.41%) 9% <0%> (-1%)
... and 40 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 12c0e30...6bd71c9. Read the comment docs.

Copy link
Collaborator

@vanniktech vanniktech left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the overloads 👍

void otherSuccess(T value) {
if (compareAndSet(0, 1)) {
actual.onNext(value);
otherState = 2;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we go and use some constants here to improve readability? It's really hard to follow.


@Test
public void normal() {
final TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be inlined here by using test()


@Test
public void normal() {
final TestObserver<Integer> ts = new TestObserver<Integer>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's an to and not ts


@Test
public void cancel() {
final PublishSubject<Integer> pp = PublishSubject.create();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ps not pp


@Test
public void normal() {
final TestObserver<Integer> ts = new TestObserver<Integer>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same


@Test
public void normal() {
final TestObserver<Integer> ts = new TestObserver<Integer>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same


emitted = e;
consumed = c;
missed = addAndGet(-missed);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see requested not being reduced by emitted amount and emitted is now non-volatile field. A bit different to older patterns I am familiar with. Looks handy to eliminate compareAndSet calls on requested from this spot. Nice.

if (compareAndSet(0, 1)) {
long e = emitted;
if (requested.get() != e) {
SimplePlainQueue<T> q = queue;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure don't need to call getOrCreateQueue() here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If there is no queue yet or is empty, that means T can be emitted immediately and events from the main source won't be reordered. Otherwise the queue exists and the item should be queued.

return;
}
}
drainLoop();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised to see this call drainLoop() here rather than drain (and in otherSuccess()) . Are you sure access to drainLoop will be serialized?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the execution reaches this, the work-in-progress indicator is known to be non-zero as we are in the drain mode already.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, thanks

@akarnokd
Copy link
Member Author

Conflict resolved. There was an expected merge conflict with the concatWith PR as they both touched the same test file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants