Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes to the operators. #1137

Merged
merged 1 commit into from
May 5, 2014
Merged

Conversation

akarnokd
Copy link
Member

Fixes for issue #1136

  • OperatorMulticast is straightforward from concurrency perspective. The only consideration is that if the current subscription gets unsubscribed before the connect reaches the unsafeSubscribe, it really depends on the source what it will do with an unsubscribed client. It is possible to put extra effort to make sure a newly established connection won't get unsubscribed before it is actually connected or if it gets unsubscribed immediately, no subscription is attempted at all.
  • OperatorSampleWithTime: didn't want to push too many PRs so I just included it here. There was a missing unsubscribe in the onCompleted() that makes sure the worker is stopped.
  • Subscribers.empty() was implemented by returning the same Subscriber to everyone, which Subscriber is stateful so if someone unsubscribes it, it will appear everywhere as unsubscribed and can have unwanted effects. There is no such problem with Observers.empty() as it is stateless. The change just uses Subscribers.from() to wrap Observers.empty() and every caller gets its own independent instance.
  • OperatorRefCount was a bit more tricky. Since it has a connection counter, one has to serialize subscriptions with unsubscriptions. However, it is possible a subscription gets unsubscribed before code reaches the connect check which may disrupt the connection counter. The solution is to keep track of the unsubscriptions that happen before the connection attempts and not change the counter in case of out-of-order behavior. The final aim was to avoid leaking the connection statuses if the unsafeSubscribe throws concurrently with a client unsubscribing by using weak tokens (integers wouldn't have worked as the first 0-127 are cached in the JVM and would never GC).

@cloudbees-pull-request-builder

RxJava-pull-requests #1050 SUCCESS
This pull request looks good

benjchristensen added a commit that referenced this pull request May 5, 2014
@benjchristensen benjchristensen merged commit 1af3674 into ReactiveX:master May 5, 2014
@benjchristensen
Copy link
Member

Thanks for tackling these complicated issues.

@akarnokd akarnokd deleted the OperatorFixes430 branch May 6, 2014 13:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants