-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
2.x: Fix Flowable.publish(-|Function) subscriber swap possible data loss #5893
Conversation
Codecov Report
@@ Coverage Diff @@
## 2.x #5893 +/- ##
============================================
- Coverage 97.92% 97.91% -0.02%
- Complexity 5985 5987 +2
============================================
Files 655 655
Lines 43836 43853 +17
Branches 6072 6075 +3
============================================
+ Hits 42926 42938 +12
- Misses 278 283 +5
Partials 632 632
Continue to review full report at Codecov.
|
@@ -941,4 +941,365 @@ protected void subscribeActual(Subscriber<? super Integer> s) { | |||
ref.get().add(new InnerSubscriber<Integer>(new TestSubscriber<Integer>())); | |||
ref.get().remove(null); | |||
} | |||
|
|||
@Test | |||
@Ignore("publish() keeps consuming the upstream if there are no subscribers, 3.x should change this") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we document this somewhere in one of the 3.x issues?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Posted as #5899.
This PR fixes an avoidable dataloss in the following subscriber-swap scenario with the
publish()
andpublish(Function<Flowable, Publisher>)
.When an
onNext
changes the current array of subscribers (an existing consumer cancelled or a new one arrived), the change detection is delayed and items may get dropped even though a fresh consumer could take those values.The algorithms were updated to fix this case as well as the emission tracking in
FlowableMulticastProcessor
: a globalemitted
value is not good here after all (subscribers joining late would indicate an incorrect aggregate demand and get overflown).One of the advanced uses of
publish(Function)
is to implement consumption mode changes by having a mode cancel the subscription to the sharedFlowable
and synchronously subscribe a new consumer with a different behavior.The discovery of this shortcoming was due to a special transformation pattern: apply a transformer if the source is not empty, and in this case, deliver all items of it.