Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Fix Flowable.publish(-|Function) subscriber swap possible data loss #5893

Merged
merged 1 commit into from
Mar 7, 2018

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Mar 6, 2018

This PR fixes an avoidable dataloss in the following subscriber-swap scenario with the publish() and publish(Function<Flowable, Publisher>).

When an onNext changes the current array of subscribers (an existing consumer cancelled or a new one arrived), the change detection is delayed and items may get dropped even though a fresh consumer could take those values.

The algorithms were updated to fix this case as well as the emission tracking in FlowableMulticastProcessor: a global emitted value is not good here after all (subscribers joining late would indicate an incorrect aggregate demand and get overflown).

One of the advanced uses of publish(Function) is to implement consumption mode changes by having a mode cancel the subscription to the shared Flowable and synchronously subscribe a new consumer with a different behavior.

The discovery of this shortcoming was due to a special transformation pattern: apply a transformer if the source is not empty, and in this case, deliver all items of it.

FlowableTransformer<T, U> transformer = ...
source.publish(shared ->
   // let's see if there is at least 1 source item
   shared.take(1)
   // if there is one item, stop this phase and keep the item
   .concatMap(first ->
       // given the very first item, apply the transformation
       // on the "original" sequence by reattaching the first
       // item to the rest of the source
       transformer.apply(shared.startWith(first))
   )
)
.subscribe(/* ... */);

@akarnokd akarnokd added this to the 2.2 milestone Mar 6, 2018
@codecov
Copy link

codecov bot commented Mar 6, 2018

Codecov Report

Merging #5893 into 2.x will decrease coverage by 0.01%.
The diff coverage is 100%.

Impacted file tree graph

@@             Coverage Diff              @@
##                2.x    #5893      +/-   ##
============================================
- Coverage     97.92%   97.91%   -0.02%     
- Complexity     5985     5987       +2     
============================================
  Files           655      655              
  Lines         43836    43853      +17     
  Branches       6072     6075       +3     
============================================
+ Hits          42926    42938      +12     
- Misses          278      283       +5     
  Partials        632      632
Impacted Files Coverage Δ Complexity Δ
...l/operators/flowable/FlowablePublishMulticast.java 100% <100%> (ø) 2 <0> (ø) ⬇️
...x/internal/operators/flowable/FlowablePublish.java 97.44% <100%> (+1.41%) 11 <0> (ø) ⬇️
...l/operators/observable/ObservableFlatMapMaybe.java 88.23% <0%> (-6.54%) 2% <0%> (ø)
...in/java/io/reactivex/subjects/BehaviorSubject.java 96.23% <0%> (-2.69%) 54% <0%> (ø)
...ternal/operators/completable/CompletableMerge.java 96.42% <0%> (-2.39%) 2% <0%> (ø)
...activex/internal/schedulers/ExecutorScheduler.java 97.97% <0%> (-2.03%) 9% <0%> (ø)
...perators/single/SingleFlatMapIterableFlowable.java 96.66% <0%> (-1.67%) 2% <0%> (ø)
...ernal/operators/flowable/FlowableFromIterable.java 93.04% <0%> (-1.07%) 5% <0%> (ø)
...perators/observable/ObservableMergeWithSingle.java 99.06% <0%> (-0.94%) 2% <0%> (ø)
...operators/observable/ObservableMergeWithMaybe.java 99.1% <0%> (-0.9%) 2% <0%> (ø)
... and 13 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update cb05a26...cc6fdb9. Read the comment docs.

@@ -941,4 +941,365 @@ protected void subscribeActual(Subscriber<? super Integer> s) {
ref.get().add(new InnerSubscriber<Integer>(new TestSubscriber<Integer>()));
ref.get().remove(null);
}

@Test
@Ignore("publish() keeps consuming the upstream if there are no subscribers, 3.x should change this")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we document this somewhere in one of the 3.x issues?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Posted as #5899.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants