Skip to content

Commit

Permalink
Merge pull request ReactiveX#661 from benjchristensen/subscriptions
Browse files Browse the repository at this point in the history
Subscriptions Rewrite
  • Loading branch information
benjchristensen committed Dec 23, 2013
2 parents 22f99d3 + 0664235 commit 8abcfad
Show file tree
Hide file tree
Showing 11 changed files with 391 additions and 384 deletions.
17 changes: 12 additions & 5 deletions rxjava-core/src/main/java/rx/joins/JoinObserver1.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;

import rx.Notification;
import rx.Observable;
import rx.subscriptions.SingleAssignmentSubscription;
import rx.operators.SafeObservableSubscription;
import rx.util.functions.Action1;

/**
Expand All @@ -33,14 +35,15 @@ public final class JoinObserver1<T> extends ObserverBase<Notification<T>> implem
private final Action1<Throwable> onError;
private final List<ActivePlan0> activePlans;
private final Queue<Notification<T>> queue;
private final SingleAssignmentSubscription subscription;
private final SafeObservableSubscription subscription;
private volatile boolean done;
private final AtomicBoolean subscribed = new AtomicBoolean(false);

public JoinObserver1(Observable<T> source, Action1<Throwable> onError) {
this.source = source;
this.onError = onError;
queue = new LinkedList<Notification<T>>();
subscription = new SingleAssignmentSubscription();
subscription = new SafeObservableSubscription();
activePlans = new ArrayList<ActivePlan0>();
}
public Queue<Notification<T>> queue() {
Expand All @@ -51,8 +54,12 @@ public void addActivePlan(ActivePlan0 activePlan) {
}
@Override
public void subscribe(Object gate) {
this.gate = gate;
subscription.set(source.materialize().subscribe(this));
if (subscribed.compareAndSet(false, true)) {
this.gate = gate;
subscription.wrap(source.materialize().subscribe(this));
} else {
throw new IllegalStateException("Can only be subscribed to once.");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,45 @@

import rx.Observable;
import rx.Subscription;
import rx.util.functions.Action0;

/**
* Subscription that can be checked for status such as in a loop inside an {@link Observable} to exit the loop if unsubscribed.
*
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.booleandisposable(v=vs.103).aspx">Rx.Net equivalent BooleanDisposable</a>
*/
public class BooleanSubscription implements Subscription {
public final class BooleanSubscription implements Subscription {

private final AtomicBoolean unsubscribed = new AtomicBoolean(false);
private final Action0 action;

public BooleanSubscription() {
action = null;
}

private BooleanSubscription(Action0 action) {
this.action = action;
}

public static BooleanSubscription create() {
return new BooleanSubscription();
}

public static BooleanSubscription create(Action0 onUnsubscribe) {
return new BooleanSubscription(onUnsubscribe);
}

public boolean isUnsubscribed() {
return unsubscribed.get();
}

@Override
public void unsubscribe() {
unsubscribed.set(true);
public final void unsubscribe() {
if (unsubscribed.compareAndSet(false, true)) {
if (action != null) {
action.call();
}
}
}

}
224 changes: 107 additions & 117 deletions rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subscriptions;

import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableSet;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import rx.Subscription;
Expand All @@ -31,106 +27,118 @@
/**
* Subscription that represents a group of Subscriptions that are unsubscribed
* together.
*
* @see <a
* href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net
* equivalent CompositeDisposable</a>
*
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
*/
public class CompositeSubscription implements Subscription {
/** Sentinel to indicate a thread is modifying the subscription set. */
private static final Set<Subscription> MUTATE_SENTINEL = unmodifiableSet(Collections.<Subscription>emptySet());
/** Sentinel to indicate the entire CompositeSubscription has been unsubscribed.*/
private static final Set<Subscription> UNSUBSCRIBED_SENTINEL = unmodifiableSet(Collections.<Subscription>emptySet());
/** The reference to the set of subscriptions. */
private final AtomicReference<Set<Subscription>> reference = new AtomicReference<Set<Subscription>>();

public final class CompositeSubscription implements Subscription {

private final AtomicReference<State> state = new AtomicReference<State>();

private static final class State {
final boolean isUnsubscribed;
final List<Subscription> subscriptions;

State(boolean u, List<Subscription> s) {
this.isUnsubscribed = u;
this.subscriptions = s;
}

State unsubscribe() {
return new State(true, subscriptions);
}

State add(Subscription s) {
List<Subscription> newSubscriptions = new ArrayList<Subscription>();
newSubscriptions.addAll(subscriptions);
newSubscriptions.add(s);
return new State(isUnsubscribed, newSubscriptions);
}

State remove(Subscription s) {
List<Subscription> newSubscriptions = new ArrayList<Subscription>();
newSubscriptions.addAll(subscriptions);
newSubscriptions.remove(s); // only first occurrence
return new State(isUnsubscribed, newSubscriptions);
}

State clear() {
return new State(isUnsubscribed, new ArrayList<Subscription>());
}
}

public CompositeSubscription(final Subscription... subscriptions) {
reference.set(new HashSet<Subscription>(asList(subscriptions)));
state.set(new State(false, Arrays.asList(subscriptions)));
}

public boolean isUnsubscribed() {
return reference.get() == UNSUBSCRIBED_SENTINEL;
return state.get().isUnsubscribed;
}

public void add(final Subscription s) {
State oldState;
State newState;
do {
final Set<Subscription> existing = reference.get();
if (existing == UNSUBSCRIBED_SENTINEL) {
oldState = state.get();
if (oldState.isUnsubscribed) {
s.unsubscribe();
break;
}

if (existing == MUTATE_SENTINEL) {
continue;
}

if (reference.compareAndSet(existing, MUTATE_SENTINEL)) {
existing.add(s);
reference.set(existing);
break;
return;
} else {
newState = oldState.add(s);
}
} while (true);
} while (!state.compareAndSet(oldState, newState));
}

public void remove(final Subscription s) {
State oldState;
State newState;
do {
final Set<Subscription> subscriptions = reference.get();
if (subscriptions == UNSUBSCRIBED_SENTINEL) {
s.unsubscribe();
break;
}

if (subscriptions == MUTATE_SENTINEL) {
continue;
}

if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) {
// also unsubscribe from it:
// http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
subscriptions.remove(s);
reference.set(subscriptions);
s.unsubscribe();
break;
oldState = state.get();
if (oldState.isUnsubscribed) {
return;
} else {
newState = oldState.remove(s);
}
} while (true);
} while (!state.compareAndSet(oldState, newState));
// if we removed successfully we then need to call unsubscribe on it
s.unsubscribe();
}

public void clear() {
State oldState;
State newState;
do {
final Set<Subscription> subscriptions = reference.get();
if (subscriptions == UNSUBSCRIBED_SENTINEL) {
break;
}

if (subscriptions == MUTATE_SENTINEL) {
continue;
oldState = state.get();
if (oldState.isUnsubscribed) {
return;
} else {
newState = oldState.clear();
}

if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) {
final Set<Subscription> copy = new HashSet<Subscription>(
subscriptions);
subscriptions.clear();
reference.set(subscriptions);

unsubscribeAll(copy);
break;
} while (!state.compareAndSet(oldState, newState));
// if we cleared successfully we then need to call unsubscribe on all previous
unsubscribeFromAll(oldState.subscriptions);
}

@Override
public void unsubscribe() {
State oldState;
State newState;
do {
oldState = state.get();
if (oldState.isUnsubscribed) {
return;
} else {
newState = oldState.unsubscribe();
}
} while (true);
} while (!state.compareAndSet(oldState, newState));
unsubscribeFromAll(oldState.subscriptions);
}
/**
* Unsubscribe from the collection of subscriptions.
* <p>
* Exceptions thrown by any of the {@code unsubscribe()} methods are
* collected into a {@link CompositeException} and thrown once
* all unsubscriptions have been attempted.
* @param subs the collection of subscriptions
*/
private void unsubscribeAll(Collection<Subscription> subs) {

private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
final Collection<Throwable> es = new ArrayList<Throwable>();
for (final Subscription s : subs) {
for (Subscription s : subscriptions) {
try {
s.unsubscribe();
} catch (final Throwable e) {
} catch (Throwable e) {
es.add(e);
}
}
Expand All @@ -139,22 +147,4 @@ private void unsubscribeAll(Collection<Subscription> subs) {
"Failed to unsubscribe to 1 or more subscriptions.", es);
}
}
@Override
public void unsubscribe() {
do {
final Set<Subscription> subscriptions = reference.get();
if (subscriptions == UNSUBSCRIBED_SENTINEL) {
break;
}

if (subscriptions == MUTATE_SENTINEL) {
continue;
}

if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_SENTINEL)) {
unsubscribeAll(subscriptions);
break;
}
} while (true);
}
}
Loading

0 comments on commit 8abcfad

Please sign in to comment.