Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriptions Rewrite #661

Merged
merged 8 commits into from
Dec 23, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions rxjava-core/src/main/java/rx/joins/JoinObserver1.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;

import rx.Notification;
import rx.Observable;
import rx.subscriptions.SingleAssignmentSubscription;
import rx.operators.SafeObservableSubscription;
import rx.util.functions.Action1;

/**
Expand All @@ -33,14 +35,15 @@ public final class JoinObserver1<T> extends ObserverBase<Notification<T>> implem
private final Action1<Throwable> onError;
private final List<ActivePlan0> activePlans;
private final Queue<Notification<T>> queue;
private final SingleAssignmentSubscription subscription;
private final SafeObservableSubscription subscription;
private volatile boolean done;
private final AtomicBoolean subscribed = new AtomicBoolean(false);

public JoinObserver1(Observable<T> source, Action1<Throwable> onError) {
this.source = source;
this.onError = onError;
queue = new LinkedList<Notification<T>>();
subscription = new SingleAssignmentSubscription();
subscription = new SafeObservableSubscription();
activePlans = new ArrayList<ActivePlan0>();
}
public Queue<Notification<T>> queue() {
Expand All @@ -51,8 +54,12 @@ public void addActivePlan(ActivePlan0 activePlan) {
}
@Override
public void subscribe(Object gate) {
this.gate = gate;
subscription.set(source.materialize().subscribe(this));
if (subscribed.compareAndSet(false, true)) {
this.gate = gate;
subscription.wrap(source.materialize().subscribe(this));
} else {
throw new IllegalStateException("Can only be subscribed to once.");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,45 @@

import rx.Observable;
import rx.Subscription;
import rx.util.functions.Action0;

/**
* Subscription that can be checked for status such as in a loop inside an {@link Observable} to exit the loop if unsubscribed.
*
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.booleandisposable(v=vs.103).aspx">Rx.Net equivalent BooleanDisposable</a>
*/
public class BooleanSubscription implements Subscription {
public final class BooleanSubscription implements Subscription {

private final AtomicBoolean unsubscribed = new AtomicBoolean(false);
private final Action0 action;

public BooleanSubscription() {
action = null;
}

private BooleanSubscription(Action0 action) {
this.action = action;
}

public static BooleanSubscription create() {
return new BooleanSubscription();
}

public static BooleanSubscription create(Action0 onUnsubscribe) {
return new BooleanSubscription(onUnsubscribe);
}

public boolean isUnsubscribed() {
return unsubscribed.get();
}

@Override
public void unsubscribe() {
unsubscribed.set(true);
public final void unsubscribe() {
if (unsubscribed.compareAndSet(false, true)) {
if (action != null) {
action.call();
}
}
}

}
224 changes: 107 additions & 117 deletions rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subscriptions;

import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableSet;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import rx.Subscription;
Expand All @@ -31,106 +27,118 @@
/**
* Subscription that represents a group of Subscriptions that are unsubscribed
* together.
*
* @see <a
* href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net
* equivalent CompositeDisposable</a>
*
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
*/
public class CompositeSubscription implements Subscription {
/** Sentinel to indicate a thread is modifying the subscription set. */
private static final Set<Subscription> MUTATE_SENTINEL = unmodifiableSet(Collections.<Subscription>emptySet());
/** Sentinel to indicate the entire CompositeSubscription has been unsubscribed.*/
private static final Set<Subscription> UNSUBSCRIBED_SENTINEL = unmodifiableSet(Collections.<Subscription>emptySet());
/** The reference to the set of subscriptions. */
private final AtomicReference<Set<Subscription>> reference = new AtomicReference<Set<Subscription>>();

public final class CompositeSubscription implements Subscription {

private final AtomicReference<State> state = new AtomicReference<State>();

private static final class State {
final boolean isUnsubscribed;
final List<Subscription> subscriptions;

State(boolean u, List<Subscription> s) {
this.isUnsubscribed = u;
this.subscriptions = s;
}

State unsubscribe() {
return new State(true, subscriptions);
}

State add(Subscription s) {
List<Subscription> newSubscriptions = new ArrayList<Subscription>();
newSubscriptions.addAll(subscriptions);
newSubscriptions.add(s);
return new State(isUnsubscribed, newSubscriptions);
}

State remove(Subscription s) {
List<Subscription> newSubscriptions = new ArrayList<Subscription>();
newSubscriptions.addAll(subscriptions);
newSubscriptions.remove(s); // only first occurrence
return new State(isUnsubscribed, newSubscriptions);
}

State clear() {
return new State(isUnsubscribed, new ArrayList<Subscription>());
}
}

public CompositeSubscription(final Subscription... subscriptions) {
reference.set(new HashSet<Subscription>(asList(subscriptions)));
state.set(new State(false, Arrays.asList(subscriptions)));
}

public boolean isUnsubscribed() {
return reference.get() == UNSUBSCRIBED_SENTINEL;
return state.get().isUnsubscribed;
}

public void add(final Subscription s) {
State oldState;
State newState;
do {
final Set<Subscription> existing = reference.get();
if (existing == UNSUBSCRIBED_SENTINEL) {
oldState = state.get();
if (oldState.isUnsubscribed) {
s.unsubscribe();
break;
}

if (existing == MUTATE_SENTINEL) {
continue;
}

if (reference.compareAndSet(existing, MUTATE_SENTINEL)) {
existing.add(s);
reference.set(existing);
break;
return;
} else {
newState = oldState.add(s);
}
} while (true);
} while (!state.compareAndSet(oldState, newState));
}

public void remove(final Subscription s) {
State oldState;
State newState;
do {
final Set<Subscription> subscriptions = reference.get();
if (subscriptions == UNSUBSCRIBED_SENTINEL) {
s.unsubscribe();
break;
}

if (subscriptions == MUTATE_SENTINEL) {
continue;
}

if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) {
// also unsubscribe from it:
// http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
subscriptions.remove(s);
reference.set(subscriptions);
s.unsubscribe();
break;
oldState = state.get();
if (oldState.isUnsubscribed) {
return;
} else {
newState = oldState.remove(s);
}
} while (true);
} while (!state.compareAndSet(oldState, newState));
// if we removed successfully we then need to call unsubscribe on it
s.unsubscribe();
}

public void clear() {
State oldState;
State newState;
do {
final Set<Subscription> subscriptions = reference.get();
if (subscriptions == UNSUBSCRIBED_SENTINEL) {
break;
}

if (subscriptions == MUTATE_SENTINEL) {
continue;
oldState = state.get();
if (oldState.isUnsubscribed) {
return;
} else {
newState = oldState.clear();
}

if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) {
final Set<Subscription> copy = new HashSet<Subscription>(
subscriptions);
subscriptions.clear();
reference.set(subscriptions);

unsubscribeAll(copy);
break;
} while (!state.compareAndSet(oldState, newState));
// if we cleared successfully we then need to call unsubscribe on all previous
unsubscribeFromAll(oldState.subscriptions);
}

@Override
public void unsubscribe() {
State oldState;
State newState;
do {
oldState = state.get();
if (oldState.isUnsubscribed) {
return;
} else {
newState = oldState.unsubscribe();
}
} while (true);
} while (!state.compareAndSet(oldState, newState));
unsubscribeFromAll(oldState.subscriptions);
}
/**
* Unsubscribe from the collection of subscriptions.
* <p>
* Exceptions thrown by any of the {@code unsubscribe()} methods are
* collected into a {@link CompositeException} and thrown once
* all unsubscriptions have been attempted.
* @param subs the collection of subscriptions
*/
private void unsubscribeAll(Collection<Subscription> subs) {

private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
final Collection<Throwable> es = new ArrayList<Throwable>();
for (final Subscription s : subs) {
for (Subscription s : subscriptions) {
try {
s.unsubscribe();
} catch (final Throwable e) {
} catch (Throwable e) {
es.add(e);
}
}
Expand All @@ -139,22 +147,4 @@ private void unsubscribeAll(Collection<Subscription> subs) {
"Failed to unsubscribe to 1 or more subscriptions.", es);
}
}
@Override
public void unsubscribe() {
do {
final Set<Subscription> subscriptions = reference.get();
if (subscriptions == UNSUBSCRIBED_SENTINEL) {
break;
}

if (subscriptions == MUTATE_SENTINEL) {
continue;
}

if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_SENTINEL)) {
unsubscribeAll(subscriptions);
break;
}
} while (true);
}
}
Loading