diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java index b11ebd660c..af08f2a1b9 100644 --- a/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -15,19 +15,16 @@ */ package rx.internal.operators; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.Queue; +import java.util.concurrent.atomic.*; import rx.Observable.Operator; -import rx.Producer; -import rx.Scheduler; -import rx.Subscriber; -import rx.Subscription; +import rx.*; import rx.exceptions.MissingBackpressureException; import rx.functions.Action0; -import rx.internal.util.RxRingBuffer; -import rx.schedulers.ImmediateScheduler; -import rx.schedulers.TrampolineScheduler; +import rx.internal.util.*; +import rx.internal.util.unsafe.*; +import rx.schedulers.*; /** * Delivers events on the specified {@code Scheduler} asynchronously via an unbounded buffer. @@ -64,16 +61,15 @@ public Subscriber call(Subscriber child) { /** Observe through individual queue per observer. */ private static final class ObserveOnSubscriber extends Subscriber { final Subscriber child; - private final Scheduler.Worker recursiveScheduler; - private final ScheduledUnsubscribe scheduledUnsubscribe; + final Scheduler.Worker recursiveScheduler; + final ScheduledUnsubscribe scheduledUnsubscribe; final NotificationLite on = NotificationLite.instance(); - private final RxRingBuffer queue = RxRingBuffer.getSpscInstance(); - private boolean completed = false; - private boolean failure = false; + final Queue queue; + volatile boolean completed = false; + volatile boolean failure = false; - @SuppressWarnings("unused") - private volatile long requested = 0; + volatile long requested = 0; @SuppressWarnings("rawtypes") static final AtomicLongFieldUpdater REQUESTED = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "requested"); @@ -82,12 +78,19 @@ private static final class ObserveOnSubscriber extends Subscriber { @SuppressWarnings("rawtypes") static final AtomicLongFieldUpdater COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "counter"); + volatile Throwable error; + // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should // not prevent anything downstream from consuming, which will happen if the Subscription is chained public ObserveOnSubscriber(Scheduler scheduler, Subscriber child) { this.child = child; this.recursiveScheduler = scheduler.createWorker(); - this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler, queue); + if (UnsafeAccess.isUnsafeAvailable()) { + queue = new SpscArrayQueue(RxRingBuffer.SIZE); + } else { + queue = new SynchronizedQueue(RxRingBuffer.SIZE); + } + this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler); child.add(scheduledUnsubscribe); child.setProducer(new Producer() { @@ -113,10 +116,8 @@ public void onNext(final T t) { if (isUnsubscribed() || completed) { return; } - try { - queue.onNext(t); - } catch (MissingBackpressureException e) { - onError(e); + if (!queue.offer(on.next(t))) { + onError(new MissingBackpressureException()); return; } schedule(); @@ -127,8 +128,10 @@ public void onCompleted() { if (isUnsubscribed() || completed) { return; } + if (error != null) { + return; + } completed = true; - queue.onCompleted(); schedule(); } @@ -137,53 +140,64 @@ public void onError(final Throwable e) { if (isUnsubscribed() || completed) { return; } + if (error != null) { + return; + } + error = e; // unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event unsubscribe(); - completed = true; // mark failure so the polling thread will skip onNext still in the queue + completed = true; failure = true; - queue.onError(e); schedule(); } - protected void schedule() { - if (COUNTER_UPDATER.getAndIncrement(this) == 0) { - recursiveScheduler.schedule(new Action0() { + final Action0 action = new Action0() { - @Override - public void call() { - pollQueue(); - } + @Override + public void call() { + pollQueue(); + } - }); + }; + + protected void schedule() { + if (COUNTER_UPDATER.getAndIncrement(this) == 0) { + recursiveScheduler.schedule(action); } } // only execute this from schedule() - private void pollQueue() { + void pollQueue() { int emitted = 0; do { /* * Set to 1 otherwise it could have grown very large while in the last poll loop * and then we can end up looping all those times again here before exiting even once we've drained */ - COUNTER_UPDATER.set(this, 1); + counter = 1; +// middle: while (!scheduledUnsubscribe.isUnsubscribed()) { if (failure) { - // special handling to short-circuit an error propagation - Object o = queue.poll(); - // completed so we will skip onNext if they exist and only emit terminal events - if (on.isError(o)) { - // only emit error - on.accept(child, o); - // we have emitted a terminal event so return (exit the loop we're in) + child.onError(error); + return; + } else { + if (requested == 0 && completed && queue.isEmpty()) { + child.onCompleted(); return; } - } else { if (REQUESTED.getAndDecrement(this) != 0) { Object o = queue.poll(); if (o == null) { + if (completed) { + if (failure) { + child.onError(error); + } else { + child.onCompleted(); + } + return; + } // nothing in queue REQUESTED.incrementAndGet(this); break; @@ -213,12 +227,10 @@ static final class ScheduledUnsubscribe implements Subscription { final Scheduler.Worker worker; volatile int once; static final AtomicIntegerFieldUpdater ONCE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ScheduledUnsubscribe.class, "once"); - final RxRingBuffer queue; volatile boolean unsubscribed = false; - public ScheduledUnsubscribe(Scheduler.Worker worker, RxRingBuffer queue) { + public ScheduledUnsubscribe(Scheduler.Worker worker) { this.worker = worker; - this.queue = queue; } @Override diff --git a/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java b/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java index 91a5440227..71c4397754 100644 --- a/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java +++ b/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java @@ -15,15 +15,12 @@ */ package rx.internal.schedulers; -import rx.Scheduler; -import rx.Subscription; -import rx.functions.Action0; -import rx.internal.util.RxThreadFactory; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.Subscriptions; +import java.util.concurrent.*; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import rx.*; +import rx.functions.Action0; +import rx.internal.util.*; +import rx.subscriptions.*; public class EventLoopsScheduler extends Scheduler { /** Manages a fixed number of workers. */ @@ -95,7 +92,9 @@ public Subscription scheduleDirect(Action0 action) { } private static class EventLoopWorker extends Scheduler.Worker { - private final CompositeSubscription innerSubscription = new CompositeSubscription(); + private final SubscriptionList serial = new SubscriptionList(); + private final CompositeSubscription timed = new CompositeSubscription(); + private final SubscriptionList both = new SubscriptionList(serial, timed); private final PoolWorker poolWorker; EventLoopWorker(PoolWorker poolWorker) { @@ -105,28 +104,33 @@ private static class EventLoopWorker extends Scheduler.Worker { @Override public void unsubscribe() { - innerSubscription.unsubscribe(); + both.unsubscribe(); } @Override public boolean isUnsubscribed() { - return innerSubscription.isUnsubscribed(); + return both.isUnsubscribed(); } @Override public Subscription schedule(Action0 action) { - return schedule(action, 0, null); + if (isUnsubscribed()) { + return Subscriptions.unsubscribed(); + } + ScheduledAction s = poolWorker.scheduleActual(action, 0, null); + + serial.add(s); + s.addParent(serial); + + return s; } @Override public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { - if (innerSubscription.isUnsubscribed()) { - // don't schedule, we are unsubscribed + if (isUnsubscribed()) { return Subscriptions.unsubscribed(); } + ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit, timed); - ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit); - innerSubscription.add(s); - s.addParent(innerSubscription); return s; } } diff --git a/src/main/java/rx/internal/schedulers/NewThreadWorker.java b/src/main/java/rx/internal/schedulers/NewThreadWorker.java index 41144795cb..094c94892f 100644 --- a/src/main/java/rx/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/rx/internal/schedulers/NewThreadWorker.java @@ -23,9 +23,9 @@ import rx.*; import rx.exceptions.Exceptions; import rx.functions.Action0; -import rx.internal.util.RxThreadFactory; +import rx.internal.util.*; import rx.plugins.*; -import rx.subscriptions.Subscriptions; +import rx.subscriptions.*; /** * @warn class description missing @@ -174,6 +174,37 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time return run; } + public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) { + Action0 decoratedAction = schedulersHook.onSchedule(action); + ScheduledAction run = new ScheduledAction(decoratedAction, parent); + parent.add(run); + + Future f; + if (delayTime <= 0) { + f = executor.submit(run); + } else { + f = executor.schedule(run, delayTime, unit); + } + run.add(f); + + return run; + } + + public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) { + Action0 decoratedAction = schedulersHook.onSchedule(action); + ScheduledAction run = new ScheduledAction(decoratedAction, parent); + parent.add(run); + + Future f; + if (delayTime <= 0) { + f = executor.submit(run); + } else { + f = executor.schedule(run, delayTime, unit); + } + run.add(f); + + return run; + } @Override public void unsubscribe() { diff --git a/src/main/java/rx/internal/schedulers/ScheduledAction.java b/src/main/java/rx/internal/schedulers/ScheduledAction.java index 24240096c9..8ddd18870b 100644 --- a/src/main/java/rx/internal/schedulers/ScheduledAction.java +++ b/src/main/java/rx/internal/schedulers/ScheduledAction.java @@ -16,12 +16,12 @@ package rx.internal.schedulers; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.*; import rx.Subscription; import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Action0; +import rx.internal.util.SubscriptionList; import rx.plugins.RxJavaPlugins; import rx.subscriptions.CompositeSubscription; @@ -32,12 +32,20 @@ public final class ScheduledAction extends AtomicReference implements Runnable, Subscription { /** */ private static final long serialVersionUID = -3962399486978279857L; - final CompositeSubscription cancel; + final SubscriptionList cancel; final Action0 action; public ScheduledAction(Action0 action) { this.action = action; - this.cancel = new CompositeSubscription(); + this.cancel = new SubscriptionList(); + } + public ScheduledAction(Action0 action, CompositeSubscription parent) { + this.action = action; + this.cancel = new SubscriptionList(new Remover(this, parent)); + } + public ScheduledAction(Action0 action, SubscriptionList parent) { + this.action = action; + this.cancel = new SubscriptionList(new Remover2(this, parent)); } @Override @@ -103,6 +111,17 @@ public void addParent(CompositeSubscription parent) { cancel.add(new Remover(this, parent)); } + /** + * Adds a parent {@link CompositeSubscription} to this {@code ScheduledAction} so when the action is + * cancelled or terminates, it can remove itself from this parent. + * + * @param parent + * the parent {@code CompositeSubscription} to add + */ + public void addParent(SubscriptionList parent) { + cancel.add(new Remover2(this, parent)); + } + /** * Cancels the captured future if the caller of the call method * is not the same as the runner of the outer ScheduledAction to @@ -134,10 +153,35 @@ public boolean isUnsubscribed() { private static final class Remover extends AtomicBoolean implements Subscription { /** */ private static final long serialVersionUID = 247232374289553518L; - final Subscription s; + final ScheduledAction s; final CompositeSubscription parent; - public Remover(Subscription s, CompositeSubscription parent) { + public Remover(ScheduledAction s, CompositeSubscription parent) { + this.s = s; + this.parent = parent; + } + + @Override + public boolean isUnsubscribed() { + return s.isUnsubscribed(); + } + + @Override + public void unsubscribe() { + if (compareAndSet(false, true)) { + parent.remove(s); + } + } + + } + /** Remove a child subscription from a composite when unsubscribing. */ + private static final class Remover2 extends AtomicBoolean implements Subscription { + /** */ + private static final long serialVersionUID = 247232374289553518L; + final ScheduledAction s; + final SubscriptionList parent; + + public Remover2(ScheduledAction s, SubscriptionList parent) { this.s = s; this.parent = parent; } diff --git a/src/main/java/rx/internal/util/SubscriptionList.java b/src/main/java/rx/internal/util/SubscriptionList.java index 7583131fda..a3a91fa1b0 100644 --- a/src/main/java/rx/internal/util/SubscriptionList.java +++ b/src/main/java/rx/internal/util/SubscriptionList.java @@ -20,9 +20,10 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.locks.ReentrantLock; import rx.Subscription; -import rx.exceptions.*; +import rx.exceptions.Exceptions; /** * Subscription that represents a group of Subscriptions that are unsubscribed together. @@ -31,8 +32,9 @@ */ public final class SubscriptionList implements Subscription { - private List subscriptions; + private LinkedList subscriptions; private volatile boolean unsubscribed; + private final ReentrantLock lock = new ReentrantLock(); public SubscriptionList() { } @@ -41,6 +43,11 @@ public SubscriptionList(final Subscription... subscriptions) { this.subscriptions = new LinkedList(Arrays.asList(subscriptions)); } + public SubscriptionList(Subscription s) { + this.subscriptions = new LinkedList(); + this.subscriptions.add(s); + } + @Override public boolean isUnsubscribed() { return unsubscribed; @@ -55,21 +62,49 @@ public boolean isUnsubscribed() { * the {@link Subscription} to add */ public void add(final Subscription s) { + if (s.isUnsubscribed()) { + return; + } if (!unsubscribed) { - synchronized (this) { + lock.lock(); + try { if (!unsubscribed) { - if (subscriptions == null) { - subscriptions = new LinkedList(); + LinkedList subs = subscriptions; + if (subs == null) { + subs = new LinkedList(); + subscriptions = subs; } - subscriptions.add(s); + subs.add(s); return; } + } finally { + lock.unlock(); } } // call after leaving the synchronized block so we're not holding a lock while executing this s.unsubscribe(); } + public void remove(final Subscription s) { + if (!unsubscribed) { + boolean unsubscribe = false; + lock.lock(); + try { + LinkedList subs = subscriptions; + if (unsubscribed || subs == null) { + return; + } + unsubscribe = subs.remove(s); + } finally { + lock.unlock(); + } + if (unsubscribe) { + // if we removed successfully we then need to call unsubscribe on it (outside of the lock) + s.unsubscribe(); + } + } + } + /** * Unsubscribe from all of the subscriptions in the list, which stops the receipt of notifications on * the associated {@code Subscriber}. @@ -78,13 +113,16 @@ public void add(final Subscription s) { public void unsubscribe() { if (!unsubscribed) { List list; - synchronized (this) { + lock.lock(); + try { if (unsubscribed) { return; } unsubscribed = true; list = subscriptions; subscriptions = null; + } finally { + lock.unlock(); } // we will only get here once unsubscribeFromAll(list); @@ -112,9 +150,12 @@ private static void unsubscribeFromAll(Collection subscriptions) { public void clear() { if (!unsubscribed) { List list; - synchronized (this) { + lock.lock(); + try { list = subscriptions; subscriptions = null; + } finally { + lock.unlock(); } unsubscribeFromAll(list); } @@ -125,8 +166,11 @@ public void clear() { */ public boolean hasSubscriptions() { if (!unsubscribed) { - synchronized (this) { + lock.lock(); + try { return !unsubscribed && subscriptions != null && !subscriptions.isEmpty(); + } finally { + lock.unlock(); } } return false; diff --git a/src/test/java/rx/internal/operators/OperatorConcatTest.java b/src/test/java/rx/internal/operators/OperatorConcatTest.java index 9c457e41b0..53b6f320a9 100644 --- a/src/test/java/rx/internal/operators/OperatorConcatTest.java +++ b/src/test/java/rx/internal/operators/OperatorConcatTest.java @@ -471,7 +471,7 @@ public void unsubscribe() { @Override public boolean isUnsubscribed() { - return subscribed; + return !subscribed; } }; diff --git a/src/test/java/rx/internal/operators/OperatorReplayTest.java b/src/test/java/rx/internal/operators/OperatorReplayTest.java index 8e6dddce8c..bd15f03b8b 100644 --- a/src/test/java/rx/internal/operators/OperatorReplayTest.java +++ b/src/test/java/rx/internal/operators/OperatorReplayTest.java @@ -607,6 +607,7 @@ public void testIssue2191_SchedulerUnsubscribe() throws Exception { verifyObserverMock(mockObserverBeforeConnect, 2, 6); verifyObserverMock(mockObserverAfterConnect, 2, 6); + verify(spiedWorker, times(1)).isUnsubscribed(); verify(spiedWorker, times(1)).unsubscribe(); verify(sourceUnsubscribed, times(1)).call(); @@ -666,6 +667,7 @@ public void testIssue2191_SchedulerUnsubscribeOnError() throws Exception { verifyObserver(mockObserverBeforeConnect, 2, 2, illegalArgumentException); verifyObserver(mockObserverAfterConnect, 2, 2, illegalArgumentException); + verify(spiedWorker, times(1)).isUnsubscribed(); verify(spiedWorker, times(1)).unsubscribe(); verify(sourceUnsubscribed, times(1)).call();