diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java index ecdc064dde..19b0a5eec6 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java @@ -63,31 +63,7 @@ public void subscribeActual(final CompletableObserver s) { final AtomicBoolean once = new AtomicBoolean(); - CompletableObserver inner = new CompletableObserver() { - @Override - public void onComplete() { - if (once.compareAndSet(false, true)) { - set.dispose(); - s.onComplete(); - } - } - - @Override - public void onError(Throwable e) { - if (once.compareAndSet(false, true)) { - set.dispose(); - s.onError(e); - } else { - RxJavaPlugins.onError(e); - } - } - - @Override - public void onSubscribe(Disposable d) { - set.add(d); - } - - }; + CompletableObserver inner = new Amb(once, set, s); for (int i = 0; i < count; i++) { CompletableSource c = sources[i]; @@ -113,4 +89,40 @@ public void onSubscribe(Disposable d) { s.onComplete(); } } + + static final class Amb implements CompletableObserver { + private final AtomicBoolean once; + private final CompositeDisposable set; + private final CompletableObserver s; + + Amb(AtomicBoolean once, CompositeDisposable set, CompletableObserver s) { + this.once = once; + this.set = set; + this.s = s; + } + + @Override + public void onComplete() { + if (once.compareAndSet(false, true)) { + set.dispose(); + s.onComplete(); + } + } + + @Override + public void onError(Throwable e) { + if (once.compareAndSet(false, true)) { + set.dispose(); + s.onError(e); + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + public void onSubscribe(Disposable d) { + set.add(d); + } + + } } diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableDelay.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableDelay.java index 72737f3155..059cc1c7e2 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableDelay.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableDelay.java @@ -42,36 +42,53 @@ public CompletableDelay(CompletableSource source, long delay, TimeUnit unit, Sch protected void subscribeActual(final CompletableObserver s) { final CompositeDisposable set = new CompositeDisposable(); - source.subscribe(new CompletableObserver() { + source.subscribe(new Delay(set, s)); + } + + final class Delay implements CompletableObserver { + + private final CompositeDisposable set; + private final CompletableObserver s; + + Delay(CompositeDisposable set, CompletableObserver s) { + this.set = set; + this.s = s; + } + + @Override + public void onComplete() { + set.add(scheduler.scheduleDirect(new OnComplete(), delay, unit)); + } + @Override + public void onError(final Throwable e) { + set.add(scheduler.scheduleDirect(new OnError(e), delayError ? delay : 0, unit)); + } + @Override + public void onSubscribe(Disposable d) { + set.add(d); + s.onSubscribe(set); + } + + final class OnComplete implements Runnable { @Override - public void onComplete() { - set.add(scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - s.onComplete(); - } - }, delay, unit)); + public void run() { + s.onComplete(); } + } - @Override - public void onError(final Throwable e) { - set.add(scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - s.onError(e); - } - }, delayError ? delay : 0, unit)); + final class OnError implements Runnable { + private final Throwable e; + + OnError(Throwable e) { + this.e = e; } @Override - public void onSubscribe(Disposable d) { - set.add(d); - s.onSubscribe(set); + public void run() { + s.onError(e); } - - }); + } } - } diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableDoOnEvent.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableDoOnEvent.java index ff275481f5..19d472af94 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableDoOnEvent.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableDoOnEvent.java @@ -32,36 +32,44 @@ public CompletableDoOnEvent(final CompletableSource source, final Consumer s) { - source.subscribe(new CompletableObserver() { - - @Override - public void onComplete() { - T v; - - if (completionValueSupplier != null) { - try { - v = completionValueSupplier.call(); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - s.onError(e); - return; - } - } else { - v = completionValue; - } + source.subscribe(new ToSingle(s)); + } + + final class ToSingle implements CompletableObserver { + + private final SingleObserver observer; + + ToSingle(SingleObserver observer) { + this.observer = observer; + } - if (v == null) { - s.onError(new NullPointerException("The value supplied is null")); - } else { - s.onSuccess(v); + @Override + public void onComplete() { + T v; + + if (completionValueSupplier != null) { + try { + v = completionValueSupplier.call(); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + observer.onError(e); + return; } + } else { + v = completionValue; } - @Override - public void onError(Throwable e) { - s.onError(e); + if (v == null) { + observer.onError(new NullPointerException("The value supplied is null")); + } else { + observer.onSuccess(v); } + } - @Override - public void onSubscribe(Disposable d) { - s.onSubscribe(d); - } + @Override + public void onError(Throwable e) { + observer.onError(e); + } - }); - } + @Override + public void onSubscribe(Disposable d) { + observer.onSubscribe(d); + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableMostRecent.java b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableMostRecent.java index cd1493d53f..298a3f21be 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableMostRecent.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableMostRecent.java @@ -78,44 +78,46 @@ public void onNext(T args) { * thread expect {@link Iterator#next()} called from a different thread to work. * @return the Iterator */ - public Iterator getIterable() { - return new Iterator() { - /** - * buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next(). - */ - private Object buf; - - @Override - public boolean hasNext() { - buf = value; - return !NotificationLite.isComplete(buf); - } + public Iterator getIterable() { + return new Iterator(); + } - @Override - public T next() { - try { - // if hasNext wasn't called before calling next. - if (buf == null) { - buf = value; - } - if (NotificationLite.isComplete(buf)) { - throw new NoSuchElementException(); - } - if (NotificationLite.isError(buf)) { - throw ExceptionHelper.wrapOrThrow(NotificationLite.getError(buf)); - } - return NotificationLite.getValue(buf); + final class Iterator implements java.util.Iterator { + /** + * buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next(). + */ + private Object buf; + + @Override + public boolean hasNext() { + buf = value; + return !NotificationLite.isComplete(buf); + } + + @Override + public T next() { + try { + // if hasNext wasn't called before calling next. + if (buf == null) { + buf = value; + } + if (NotificationLite.isComplete(buf)) { + throw new NoSuchElementException(); } - finally { - buf = null; + if (NotificationLite.isError(buf)) { + throw ExceptionHelper.wrapOrThrow(NotificationLite.getError(buf)); } + return NotificationLite.getValue(buf); } - - @Override - public void remove() { - throw new UnsupportedOperationException("Read only iterator"); + finally { + buf = null; } - }; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Read only iterator"); + } } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java index 40810550ff..e5ecf84f49 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java @@ -280,16 +280,7 @@ public void onSubscribe(Subscription s) { w.schedulePeriodically(this, timeskip, timeskip, unit); - w.schedule(new Runnable() { - @Override - public void run() { - synchronized (BufferSkipBoundedSubscriber.this) { - buffers.remove(b); - } - - fastPathOrderedEmitMax(b, false, w); - } - }, timespan, unit); + w.schedule(new RemoveFromBuffer(b), timespan, unit); } @Override @@ -367,16 +358,7 @@ public void run() { buffers.add(b); } - w.schedule(new Runnable() { - @Override - public void run() { - synchronized (BufferSkipBoundedSubscriber.this) { - buffers.remove(b); - } - - fastPathOrderedEmitMax(b, false, w); - } - }, timespan, unit); + w.schedule(new RemoveFromBuffer(b), timespan, unit); } @Override @@ -384,6 +366,23 @@ public boolean accept(Subscriber a, U v) { a.onNext(v); return true; } + + final class RemoveFromBuffer implements Runnable { + private final U buffer; + + RemoveFromBuffer(U buffer) { + this.buffer = buffer; + } + + @Override + public void run() { + synchronized (BufferSkipBoundedSubscriber.this) { + buffers.remove(buffer); + } + + fastPathOrderedEmitMax(buffer, false, w); + } + } } static final class BufferExactBoundedSubscriber> diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java index 976ee03ac6..d332fb5e35 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java @@ -132,12 +132,7 @@ public void subscribeActual(Subscriber s) { return; } if (n == 1) { - ((Publisher)a[0]).subscribe(new MapSubscriber(s, new Function() { - @Override - public R apply(T t) throws Exception { - return combiner.apply(new Object[] { t }); - } - })); + ((Publisher)a[0]).subscribe(new MapSubscriber(s, new SingletonArrayFunc())); return; } @@ -557,4 +552,11 @@ public void requestOne() { } } + + final class SingletonArrayFunc implements Function { + @Override + public R apply(T t) throws Exception { + return combiner.apply(new Object[] { t }); + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDelay.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDelay.java index 8241980eae..3732c48d63 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDelay.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDelay.java @@ -78,40 +78,17 @@ public void onSubscribe(Subscription s) { @Override public void onNext(final T t) { - w.schedule(new Runnable() { - @Override - public void run() { - actual.onNext(t); - } - }, delay, unit); + w.schedule(new OnNext(t), delay, unit); } @Override public void onError(final Throwable t) { - w.schedule(new Runnable() { - @Override - public void run() { - try { - actual.onError(t); - } finally { - w.dispose(); - } - } - }, delayError ? delay : 0, unit); + w.schedule(new OnError(t), delayError ? delay : 0, unit); } @Override public void onComplete() { - w.schedule(new Runnable() { - @Override - public void run() { - try { - actual.onComplete(); - } finally { - w.dispose(); - } - } - }, delay, unit); + w.schedule(new OnComplete(), delay, unit); } @Override @@ -125,5 +102,45 @@ public void cancel() { w.dispose(); } + final class OnNext implements Runnable { + private final T t; + + OnNext(T t) { + this.t = t; + } + + @Override + public void run() { + actual.onNext(t); + } + } + + final class OnError implements Runnable { + private final Throwable t; + + OnError(Throwable t) { + this.t = t; + } + + @Override + public void run() { + try { + actual.onError(t); + } finally { + w.dispose(); + } + } + } + + final class OnComplete implements Runnable { + @Override + public void run() { + try { + actual.onComplete(); + } finally { + w.dispose(); + } + } + } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDelaySubscriptionOther.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDelaySubscriptionOther.java index deda0ecaa0..75aaf9b21c 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDelaySubscriptionOther.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDelaySubscriptionOther.java @@ -38,71 +38,90 @@ public void subscribeActual(final Subscriber child) { final SubscriptionArbiter serial = new SubscriptionArbiter(); child.onSubscribe(serial); - FlowableSubscriber otherSubscriber = new FlowableSubscriber() { - boolean done; + FlowableSubscriber otherSubscriber = new DelaySubscriber(serial, child); + + other.subscribe(otherSubscriber); + } + + final class DelaySubscriber implements FlowableSubscriber { + private final SubscriptionArbiter serial; + private final Subscriber child; + boolean done; + + DelaySubscriber(SubscriptionArbiter serial, Subscriber child) { + this.serial = serial; + this.child = child; + } + + @Override + public void onSubscribe(final Subscription s) { + serial.setSubscription(new DelaySubscription(s)); + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(U t) { + onComplete(); + } + + @Override + public void onError(Throwable e) { + if (done) { + RxJavaPlugins.onError(e); + return; + } + done = true; + child.onError(e); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + + main.subscribe(new OnCompleteSubscriber()); + } + + final class DelaySubscription implements Subscription { + private final Subscription s; + + public DelaySubscription(Subscription s) { + this.s = s; + } @Override - public void onSubscribe(final Subscription s) { - serial.setSubscription(new Subscription() { - @Override - public void request(long n) { - // ignored - } - - @Override - public void cancel() { - s.cancel(); - } - }); - s.request(Long.MAX_VALUE); + public void request(long n) { + // ignored } @Override - public void onNext(U t) { - onComplete(); + public void cancel() { + s.cancel(); } + } + final class OnCompleteSubscriber implements FlowableSubscriber { @Override - public void onError(Throwable e) { - if (done) { - RxJavaPlugins.onError(e); - return; - } - done = true; - child.onError(e); + public void onSubscribe(Subscription s) { + serial.setSubscription(s); } @Override - public void onComplete() { - if (done) { - return; - } - done = true; - - main.subscribe(new FlowableSubscriber() { - @Override - public void onSubscribe(Subscription s) { - serial.setSubscription(s); - } - - @Override - public void onNext(T t) { - child.onNext(t); - } - - @Override - public void onError(Throwable t) { - child.onError(t); - } - - @Override - public void onComplete() { - child.onComplete(); - } - }); + public void onNext(T t) { + child.onNext(t); } - }; - other.subscribe(otherSubscriber); + @Override + public void onError(Throwable t) { + child.onError(t); + } + + @Override + public void onComplete() { + child.onComplete(); + } + } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java index 0f51ff48fe..200b1e042f 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java @@ -193,48 +193,23 @@ public static Function> flatMapIntoIterable(final Functio } public static Callable> replayCallable(final Flowable parent) { - return new Callable>() { - @Override - public ConnectableFlowable call() { - return parent.replay(); - } - }; + return new ReplayCallable(parent); } public static Callable> replayCallable(final Flowable parent, final int bufferSize) { - return new Callable>() { - @Override - public ConnectableFlowable call() { - return parent.replay(bufferSize); - } - }; + return new BufferedReplayCallable(parent, bufferSize); } public static Callable> replayCallable(final Flowable parent, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) { - return new Callable>() { - @Override - public ConnectableFlowable call() { - return parent.replay(bufferSize, time, unit, scheduler); - } - }; + return new BufferedTimedReplay(parent, bufferSize, time, unit, scheduler); } public static Callable> replayCallable(final Flowable parent, final long time, final TimeUnit unit, final Scheduler scheduler) { - return new Callable>() { - @Override - public ConnectableFlowable call() { - return parent.replay(time, unit, scheduler); - } - }; + return new TimedReplay(parent, time, unit, scheduler); } public static Function, Publisher> replayFunction(final Function, ? extends Publisher> selector, final Scheduler scheduler) { - return new Function, Publisher>() { - @Override - public Publisher apply(Flowable t) throws Exception { - return Flowable.fromPublisher(selector.apply(t)).observeOn(scheduler); - } - }; + return new ReplayFunction(selector, scheduler); } public enum RequestMax implements Consumer { @@ -263,4 +238,86 @@ public static Function>, Publisher(zipper); } + static final class ReplayCallable implements Callable> { + private final Flowable parent; + + public ReplayCallable(Flowable parent) { + this.parent = parent; + } + + @Override + public ConnectableFlowable call() { + return parent.replay(); + } + } + + static final class BufferedReplayCallable implements Callable> { + private final Flowable parent; + private final int bufferSize; + + BufferedReplayCallable(Flowable parent, int bufferSize) { + this.parent = parent; + this.bufferSize = bufferSize; + } + + @Override + public ConnectableFlowable call() { + return parent.replay(bufferSize); + } + } + + static final class BufferedTimedReplay implements Callable> { + private final Flowable parent; + private final int bufferSize; + private final long time; + private final TimeUnit unit; + private final Scheduler scheduler; + + BufferedTimedReplay(Flowable parent, int bufferSize, long time, TimeUnit unit, Scheduler scheduler) { + this.parent = parent; + this.bufferSize = bufferSize; + this.time = time; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public ConnectableFlowable call() { + return parent.replay(bufferSize, time, unit, scheduler); + } + } + + static final class TimedReplay implements Callable> { + private final Flowable parent; + private final long time; + private final TimeUnit unit; + private final Scheduler scheduler; + + TimedReplay(Flowable parent, long time, TimeUnit unit, Scheduler scheduler) { + this.parent = parent; + this.time = time; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public ConnectableFlowable call() { + return parent.replay(time, unit, scheduler); + } + } + + static final class ReplayFunction implements Function, Publisher> { + private final Function, ? extends Publisher> selector; + private final Scheduler scheduler; + + public ReplayFunction(Function, ? extends Publisher> selector, Scheduler scheduler) { + this.selector = selector; + this.scheduler = scheduler; + } + + @Override + public Publisher apply(Flowable t) throws Exception { + return Flowable.fromPublisher(selector.apply(t)).observeOn(scheduler); + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java index 803b16fbc6..147f9ecc11 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java @@ -60,71 +60,7 @@ public final class FlowablePublish extends ConnectableFlowable implements public static ConnectableFlowable create(Flowable source, final int bufferSize) { // the current connection to source needs to be shared between the operator and its onSubscribe call final AtomicReference> curr = new AtomicReference>(); - Publisher onSubscribe = new Publisher() { - @Override - public void subscribe(Subscriber child) { - // create the backpressure-managing producer for this child - InnerSubscriber inner = new InnerSubscriber(child); - child.onSubscribe(inner); - // concurrent connection/disconnection may change the state, - // we loop to be atomic while the child subscribes - for (;;) { - // get the current subscriber-to-source - PublishSubscriber r = curr.get(); - // if there isn't one or it is cancelled/disposed - if (r == null || r.isDisposed()) { - // create a new subscriber to source - PublishSubscriber u = new PublishSubscriber(curr, bufferSize); - // let's try setting it as the current subscriber-to-source - if (!curr.compareAndSet(r, u)) { - // didn't work, maybe someone else did it or the current subscriber - // to source has just finished - continue; - } - // we won, let's use it going onwards - r = u; - } - - /* - * Try adding it to the current subscriber-to-source, add is atomic in respect - * to other adds and the termination of the subscriber-to-source. - */ - if (r.add(inner)) { - if (inner.get() == CANCELLED) { - r.remove(inner); - } else { - inner.parent = r; - } - r.dispatch(); - break; // NOPMD - } - /* - * The current PublishSubscriber has been terminated, try with a newer one. - */ - /* - * Note: although technically correct, concurrent disconnects can cause - * unexpected behavior such as child subscribers never receiving anything - * (unless connected again). An alternative approach, similar to - * PublishProcessor would be to immediately terminate such child - * subscribers as well: - * - * Object term = r.terminalEvent; - * if (r.nl.isCompleted(term)) { - * child.onComplete(); - * } else { - * child.onError(r.nl.getError(term)); - * } - * return; - * - * The original concurrent behavior was non-deterministic in this regard as well. - * Allowing this behavior, however, may introduce another unexpected behavior: - * after disconnecting a previous connection, one might not be able to prepare - * a new connection right after a previous termination by subscribing new child - * subscribers asynchronously before a connect call. - */ - } - } - }; + Publisher onSubscribe = new FlowablePublisher(curr, bufferSize); return RxJavaPlugins.onAssembly(new FlowablePublish(onSubscribe, source, curr, bufferSize)); } @@ -685,4 +621,78 @@ public void cancel() { } } } + + static final class FlowablePublisher implements Publisher { + private final AtomicReference> curr; + private final int bufferSize; + + FlowablePublisher(AtomicReference> curr, int bufferSize) { + this.curr = curr; + this.bufferSize = bufferSize; + } + + @Override + public void subscribe(Subscriber child) { + // create the backpressure-managing producer for this child + InnerSubscriber inner = new InnerSubscriber(child); + child.onSubscribe(inner); + // concurrent connection/disconnection may change the state, + // we loop to be atomic while the child subscribes + for (;;) { + // get the current subscriber-to-source + PublishSubscriber r = curr.get(); + // if there isn't one or it is cancelled/disposed + if (r == null || r.isDisposed()) { + // create a new subscriber to source + PublishSubscriber u = new PublishSubscriber(curr, bufferSize); + // let's try setting it as the current subscriber-to-source + if (!curr.compareAndSet(r, u)) { + // didn't work, maybe someone else did it or the current subscriber + // to source has just finished + continue; + } + // we won, let's use it going onwards + r = u; + } + + /* + * Try adding it to the current subscriber-to-source, add is atomic in respect + * to other adds and the termination of the subscriber-to-source. + */ + if (r.add(inner)) { + if (inner.get() == CANCELLED) { + r.remove(inner); + } else { + inner.parent = r; + } + r.dispatch(); + break; // NOPMD + } + /* + * The current PublishSubscriber has been terminated, try with a newer one. + */ + /* + * Note: although technically correct, concurrent disconnects can cause + * unexpected behavior such as child subscribers never receiving anything + * (unless connected again). An alternative approach, similar to + * PublishProcessor would be to immediately terminate such child + * subscribers as well: + * + * Object term = r.terminalEvent; + * if (r.nl.isCompleted(term)) { + * child.onComplete(); + * } else { + * child.onError(r.nl.getError(term)); + * } + * return; + * + * The original concurrent behavior was non-deterministic in this regard as well. + * Allowing this behavior, however, may introduce another unexpected behavior: + * after disconnecting a previous connection, one might not be able to prepare + * a new connection right after a previous termination by subscribing new child + * subscribers asynchronously before a connect call. + */ + } + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java index e93cea3ef0..2560071d7c 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java @@ -156,20 +156,7 @@ public void subscribeActual(final Subscriber subscriber) { private Consumer onSubscribe(final Subscriber subscriber, final AtomicBoolean writeLocked) { - return new Consumer() { - @Override - public void accept(Disposable subscription) { - try { - baseDisposable.add(subscription); - // ready to subscribe to source so do it - doSubscribe(subscriber, baseDisposable); - } finally { - // release the write lock - lock.unlock(); - writeLocked.set(false); - } - } - }; + return new DisposeConsumer(subscriber, writeLocked); } void doSubscribe(final Subscriber subscriber, final CompositeDisposable currentBase) { @@ -183,23 +170,54 @@ void doSubscribe(final Subscriber subscriber, final CompositeDisposab } private Disposable disconnect(final CompositeDisposable current) { - return Disposables.fromRunnable(new Runnable() { - @Override - public void run() { - lock.lock(); - try { - if (baseDisposable == current) { - if (subscriptionCount.decrementAndGet() == 0) { - baseDisposable.dispose(); - // need a new baseDisposable because once - // disposed stays that way - baseDisposable = new CompositeDisposable(); - } + return Disposables.fromRunnable(new DisposeTask(current)); + } + + final class DisposeConsumer implements Consumer { + private final Subscriber subscriber; + private final AtomicBoolean writeLocked; + + DisposeConsumer(Subscriber subscriber, AtomicBoolean writeLocked) { + this.subscriber = subscriber; + this.writeLocked = writeLocked; + } + + @Override + public void accept(Disposable subscription) { + try { + baseDisposable.add(subscription); + // ready to subscribe to source so do it + doSubscribe(subscriber, baseDisposable); + } finally { + // release the write lock + lock.unlock(); + writeLocked.set(false); + } + } + } + + final class DisposeTask implements Runnable { + private final CompositeDisposable current; + + DisposeTask(CompositeDisposable current) { + this.current = current; + } + + @Override + public void run() { + lock.lock(); + try { + if (baseDisposable == current) { + if (subscriptionCount.decrementAndGet() == 0) { + baseDisposable.dispose(); + // need a new baseDisposable because once + // disposed stays that way + baseDisposable = new CompositeDisposable(); } - } finally { - lock.unlock(); } + } finally { + lock.unlock(); } - }); + } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java index a630688a7a..05541f0bbe 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java @@ -43,12 +43,7 @@ public final class FlowableReplay extends ConnectableFlowable implements H final Publisher onSubscribe; @SuppressWarnings("rawtypes") - static final Callable DEFAULT_UNBOUNDED_FACTORY = new Callable() { - @Override - public Object call() { - return new UnboundedReplayBuffer(16); - } - }; + static final Callable DEFAULT_UNBOUNDED_FACTORY = new DefaultUnboundedFactory(); /** * Given a connectable observable factory, it multicasts over the generated @@ -62,39 +57,7 @@ public Object call() { public static Flowable multicastSelector( final Callable> connectableFactory, final Function, ? extends Publisher> selector) { - return Flowable.unsafeCreate(new Publisher() { - @Override - public void subscribe(Subscriber child) { - ConnectableFlowable co; - try { - co = ObjectHelper.requireNonNull(connectableFactory.call(), "The connectableFactory returned null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - EmptySubscription.error(e, child); - return; - } - - Publisher observable; - try { - observable = ObjectHelper.requireNonNull(selector.apply(co), "The selector returned a null Publisher"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - EmptySubscription.error(e, child); - return; - } - - final SubscriberResourceWrapper srw = new SubscriberResourceWrapper(child); - - observable.subscribe(srw); - - co.connect(new Consumer() { - @Override - public void accept(Disposable r) { - srw.setResource(r); - } - }); - } - }); + return Flowable.unsafeCreate(new MultiCastPublisher(connectableFactory, selector)); } /** @@ -107,17 +70,7 @@ public void accept(Disposable r) { */ public static ConnectableFlowable observeOn(final ConnectableFlowable co, final Scheduler scheduler) { final Flowable observable = co.observeOn(scheduler); - return RxJavaPlugins.onAssembly(new ConnectableFlowable() { - @Override - public void connect(Consumer connection) { - co.connect(connection); - } - - @Override - protected void subscribeActual(Subscriber s) { - observable.subscribe(s); - } - }); + return RxJavaPlugins.onAssembly(new ConnectableFlowableReplay(co, observable)); } /** @@ -143,12 +96,7 @@ public static ConnectableFlowable create(Flowable source, if (bufferSize == Integer.MAX_VALUE) { return createFrom(source); } - return create(source, new Callable>() { - @Override - public ReplayBuffer call() { - return new SizeBoundReplayBuffer(bufferSize); - } - }); + return create(source, new ReplayBufferTask(bufferSize)); } /** @@ -177,12 +125,7 @@ public static ConnectableFlowable create(Flowable source, */ public static ConnectableFlowable create(Flowable source, final long maxAge, final TimeUnit unit, final Scheduler scheduler, final int bufferSize) { - return create(source, new Callable>() { - @Override - public ReplayBuffer call() { - return new SizeAndTimeBoundReplayBuffer(bufferSize, maxAge, unit, scheduler); - } - }); + return create(source, new ScheduledReplayBufferTask(bufferSize, maxAge, unit, scheduler)); } /** @@ -195,62 +138,7 @@ static ConnectableFlowable create(Flowable source, final Callable> bufferFactory) { // the current connection to source needs to be shared between the operator and its onSubscribe call final AtomicReference> curr = new AtomicReference>(); - Publisher onSubscribe = new Publisher() { - @Override - public void subscribe(Subscriber child) { - // concurrent connection/disconnection may change the state, - // we loop to be atomic while the child subscribes - for (;;) { - // get the current subscriber-to-source - ReplaySubscriber r = curr.get(); - // if there isn't one - if (r == null) { - ReplayBuffer buf; - - try { - buf = bufferFactory.call(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - throw ExceptionHelper.wrapOrThrow(ex); - } - // create a new subscriber to source - ReplaySubscriber u = new ReplaySubscriber(buf); - // let's try setting it as the current subscriber-to-source - if (!curr.compareAndSet(null, u)) { - // didn't work, maybe someone else did it or the current subscriber - // to source has just finished - continue; - } - // we won, let's use it going onwards - r = u; - } - - // create the backpressure-managing producer for this child - InnerSubscription inner = new InnerSubscription(r, child); - // the producer has been registered with the current subscriber-to-source so - // at least it will receive the next terminal event - // setting the producer will trigger the first request to be considered by - // the subscriber-to-source. - child.onSubscribe(inner); - // we try to add it to the array of subscribers - // if it fails, no worries because we will still have its buffer - // so it is going to replay it for us - r.add(inner); - - if (inner.isDisposed()) { - r.remove(inner); - return; - } - - r.manageRequests(); - - // trigger the capturing of the current node and total requested - r.buffer.replay(inner); - - break; // NOPMD - } - } - }; + Publisher onSubscribe = new ReplayPublisher(curr, bufferFactory); return RxJavaPlugins.onAssembly(new FlowableReplay(onSubscribe, source, curr, bufferFactory)); } @@ -1200,4 +1088,178 @@ Node getHead() { return prev; } } + + static final class MultiCastPublisher implements Publisher { + private final Callable> connectableFactory; + private final Function, ? extends Publisher> selector; + + MultiCastPublisher(Callable> connectableFactory, Function, ? extends Publisher> selector) { + this.connectableFactory = connectableFactory; + this.selector = selector; + } + + @Override + public void subscribe(Subscriber child) { + ConnectableFlowable co; + try { + co = ObjectHelper.requireNonNull(connectableFactory.call(), "The connectableFactory returned null"); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + EmptySubscription.error(e, child); + return; + } + + Publisher observable; + try { + observable = ObjectHelper.requireNonNull(selector.apply(co), "The selector returned a null Publisher"); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + EmptySubscription.error(e, child); + return; + } + + final SubscriberResourceWrapper srw = new SubscriberResourceWrapper(child); + + observable.subscribe(srw); + + co.connect(new DisposableConsumer(srw)); + } + + final class DisposableConsumer implements Consumer { + private final SubscriberResourceWrapper srw; + + DisposableConsumer(SubscriberResourceWrapper srw) { + this.srw = srw; + } + + @Override + public void accept(Disposable r) { + srw.setResource(r); + } + } + } + + static final class ConnectableFlowableReplay extends ConnectableFlowable { + private final ConnectableFlowable co; + private final Flowable observable; + + ConnectableFlowableReplay(ConnectableFlowable co, Flowable observable) { + this.co = co; + this.observable = observable; + } + + @Override + public void connect(Consumer connection) { + co.connect(connection); + } + + @Override + protected void subscribeActual(Subscriber s) { + observable.subscribe(s); + } + } + + static final class ReplayBufferTask implements Callable> { + private final int bufferSize; + + ReplayBufferTask(int bufferSize) { + this.bufferSize = bufferSize; + } + + @Override + public ReplayBuffer call() { + return new SizeBoundReplayBuffer(bufferSize); + } + } + + static final class ScheduledReplayBufferTask implements Callable> { + private final int bufferSize; + private final long maxAge; + private final TimeUnit unit; + private final Scheduler scheduler; + + ScheduledReplayBufferTask(int bufferSize, long maxAge, TimeUnit unit, Scheduler scheduler) { + this.bufferSize = bufferSize; + this.maxAge = maxAge; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public ReplayBuffer call() { + return new SizeAndTimeBoundReplayBuffer(bufferSize, maxAge, unit, scheduler); + } + } + + static final class ReplayPublisher implements Publisher { + private final AtomicReference> curr; + private final Callable> bufferFactory; + + ReplayPublisher(AtomicReference> curr, Callable> bufferFactory) { + this.curr = curr; + this.bufferFactory = bufferFactory; + } + + @Override + public void subscribe(Subscriber child) { + // concurrent connection/disconnection may change the state, + // we loop to be atomic while the child subscribes + for (;;) { + // get the current subscriber-to-source + ReplaySubscriber r = curr.get(); + // if there isn't one + if (r == null) { + ReplayBuffer buf; + + try { + buf = bufferFactory.call(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + throw ExceptionHelper.wrapOrThrow(ex); + } + // create a new subscriber to source + ReplaySubscriber u = new ReplaySubscriber(buf); + // let's try setting it as the current subscriber-to-source + if (!curr.compareAndSet(null, u)) { + // didn't work, maybe someone else did it or the current subscriber + // to source has just finished + continue; + } + // we won, let's use it going onwards + r = u; + } + + // create the backpressure-managing producer for this child + InnerSubscription inner = new InnerSubscription(r, child); + // the producer has been registered with the current subscriber-to-source so + // at least it will receive the next terminal event + // setting the producer will trigger the first request to be considered by + // the subscriber-to-source. + child.onSubscribe(inner); + // we try to add it to the array of subscribers + // if it fails, no worries because we will still have its buffer + // so it is going to replay it for us + r.add(inner); + + if (inner.isDisposed()) { + r.remove(inner); + return; + } + + r.manageRequests(); + + // trigger the capturing of the current node and total requested + r.buffer.replay(inner); + + break; // NOPMD + } + } + } + + static final class DefaultUnboundedFactory implements Callable { + @Override + public Object call() { + return new UnboundedReplayBuffer(16); + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOn.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOn.java index c99f7ffa1a..cd0c50e1e6 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOn.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOn.java @@ -132,12 +132,7 @@ void requestUpstream(final long n, final Subscription s) { if (nonScheduledRequests || Thread.currentThread() == get()) { s.request(n); } else { - worker.schedule(new Runnable() { - @Override - public void run() { - s.request(n); - } - }); + worker.schedule(new Request(s, n)); } } @@ -146,5 +141,20 @@ public void cancel() { SubscriptionHelper.cancel(s); worker.dispose(); } + + static final class Request implements Runnable { + private final Subscription s; + private final long n; + + Request(Subscription s, long n) { + this.s = s; + this.n = n; + } + + @Override + public void run() { + s.request(n); + } + } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java index 557b115b10..12a9acc060 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java @@ -33,15 +33,7 @@ public final class FlowableTimeoutTimed extends AbstractFlowableWithUpstream< final Scheduler scheduler; final Publisher other; - static final Disposable NEW_TIMER = new Disposable() { - @Override - public void dispose() { } - - @Override - public boolean isDisposed() { - return true; - } - }; + static final Disposable NEW_TIMER = new EmptyDispose(); public FlowableTimeoutTimed(Flowable source, long timeout, TimeUnit unit, Scheduler scheduler, Publisher other) { @@ -124,20 +116,7 @@ void scheduleTimeout(final long idx) { } if (timer.compareAndSet(d, NEW_TIMER)) { - d = worker.schedule(new Runnable() { - @Override - public void run() { - if (idx == index) { - done = true; - s.cancel(); - DisposableHelper.dispose(timer); - - subscribeNext(); - - worker.dispose(); - } - } - }, timeout, unit); + d = worker.schedule(new TimeoutTask(idx), timeout, unit); DisposableHelper.replace(timer, d); } @@ -178,6 +157,27 @@ public void dispose() { public boolean isDisposed() { return worker.isDisposed(); } + + final class TimeoutTask implements Runnable { + private final long idx; + + TimeoutTask(long idx) { + this.idx = idx; + } + + @Override + public void run() { + if (idx == index) { + done = true; + s.cancel(); + DisposableHelper.dispose(timer); + + subscribeNext(); + + worker.dispose(); + } + } + } } static final class TimeoutTimedSubscriber implements FlowableSubscriber, Disposable, Subscription { @@ -230,17 +230,7 @@ void scheduleTimeout(final long idx) { } if (timer.compareAndSet(d, NEW_TIMER)) { - d = worker.schedule(new Runnable() { - @Override - public void run() { - if (idx == index) { - done = true; - dispose(); - - actual.onError(new TimeoutException()); - } - } - }, timeout, unit); + d = worker.schedule(new TimeoutTask(idx), timeout, unit); DisposableHelper.replace(timer, d); } @@ -289,5 +279,35 @@ public void request(long n) { public void cancel() { dispose(); } + + final class TimeoutTask implements Runnable { + private final long idx; + + TimeoutTask(long idx) { + this.idx = idx; + } + + @Override + public void run() { + if (idx == index) { + done = true; + dispose(); + + actual.onError(new TimeoutException()); + } + } + } + } + + static final class EmptyDispose implements Disposable { + @Override + public void dispose() { } + + @Override + public boolean isDisposed() { + return true; + } } + + } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableUnsubscribeOn.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableUnsubscribeOn.java index f2158eb027..b5d6a1d001 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableUnsubscribeOn.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableUnsubscribeOn.java @@ -86,12 +86,14 @@ public void request(long n) { @Override public void cancel() { if (compareAndSet(false, true)) { - scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - s.cancel(); - } - }); + scheduler.scheduleDirect(new Cancellation()); + } + } + + final class Cancellation implements Runnable { + @Override + public void run() { + s.cancel(); } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java index 0e99732b1f..73e4819e6e 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java @@ -641,12 +641,7 @@ public void onSubscribe(Subscription s) { if (r != Long.MAX_VALUE) { produced(1); } - worker.schedule(new Runnable() { - @Override - public void run() { - complete(w); - } - }, timespan, unit); + worker.schedule(new Completion(w), timespan, unit); worker.schedulePeriodically(this, timeskip, timeskip, unit); @@ -785,12 +780,7 @@ void drainLoop() { produced(1); } - worker.schedule(new Runnable() { - @Override - public void run() { - complete(w); - } - }, timespan, unit); + worker.schedule(new Completion(w), timespan, unit); } else { a.onError(new MissingBackpressureException("Can't emit window due to lack of requests")); continue; @@ -839,6 +829,19 @@ static final class SubjectWork { this.open = open; } } + + final class Completion implements Runnable { + private final UnicastProcessor processor; + + public Completion(UnicastProcessor processor) { + this.processor = processor; + } + + @Override + public void run() { + complete(processor); + } + } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java index 3d78461d50..ed7e554cce 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java @@ -40,29 +40,7 @@ protected void subscribeActual(Subscriber s) { serial.onSubscribe(wlf); - other.subscribe(new FlowableSubscriber() { - @Override - public void onSubscribe(Subscription s) { - if (wlf.setOther(s)) { - s.request(Long.MAX_VALUE); - } - } - - @Override - public void onNext(U t) { - wlf.lazySet(t); - } - - @Override - public void onError(Throwable t) { - wlf.otherError(t); - } - - @Override - public void onComplete() { - // nothing to do, the wlf will complete on its own pace - } - }); + other.subscribe(new FlowableWithLatestSubscriber(wlf)); source.subscribe(wlf); } @@ -138,4 +116,34 @@ public void otherError(Throwable e) { actual.onError(e); } } + + final class FlowableWithLatestSubscriber implements FlowableSubscriber { + private final WithLatestFromSubscriber wlf; + + FlowableWithLatestSubscriber(WithLatestFromSubscriber wlf) { + this.wlf = wlf; + } + + @Override + public void onSubscribe(Subscription s) { + if (wlf.setOther(s)) { + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(U t) { + wlf.lazySet(t); + } + + @Override + public void onError(Throwable t) { + wlf.otherError(t); + } + + @Override + public void onComplete() { + // nothing to do, the wlf will complete on its own pace + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java index be514a6e84..772aebbf5d 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java @@ -82,12 +82,7 @@ protected void subscribeActual(Subscriber s) { } if (n == 0) { - new FlowableMap(source, new Function() { - @Override - public R apply(T t) throws Exception { - return combiner.apply(new Object[] { t }); - } - }).subscribeActual(s); + new FlowableMap(source, new SingletonArrayFunc()).subscribeActual(s); return; } @@ -298,4 +293,11 @@ public void dispose() { SubscriptionHelper.cancel(this); } } + + final class SingletonArrayFunc implements Function { + @Override + public R apply(T t) throws Exception { + return combiner.apply(new Object[] { t }); + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java index 5c1c852ee7..46f0179ddd 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java @@ -41,12 +41,7 @@ protected void subscribeActual(MaybeObserver observer) { if (n == 1) { - sources[0].subscribe(new MaybeMap.MapMaybeObserver(observer, new Function() { - @Override - public R apply(T t) throws Exception { - return zipper.apply(new Object[] { t }); - } - })); + sources[0].subscribe(new MaybeMap.MapMaybeObserver(observer, new SingletonArrayFunc())); return; } @@ -193,4 +188,11 @@ public void onComplete() { parent.innerComplete(index); } } + + final class SingletonArrayFunc implements Function { + @Override + public R apply(T t) throws Exception { + return zipper.apply(new Object[] { t }); + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java index 5ee11925cc..c6e6596809 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java @@ -61,12 +61,7 @@ protected void subscribeActual(MaybeObserver observer) { } if (n == 1) { - a[0].subscribe(new MaybeMap.MapMaybeObserver(observer, new Function() { - @Override - public R apply(T t) throws Exception { - return zipper.apply(new Object[] { t }); - } - })); + a[0].subscribe(new MaybeMap.MapMaybeObserver(observer, new SingletonArrayFunc())); return; } @@ -83,4 +78,10 @@ public R apply(T t) throws Exception { } } + final class SingletonArrayFunc implements Function { + @Override + public R apply(T t) throws Exception { + return zipper.apply(new Object[] { t }); + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecent.java b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecent.java index 047770dd86..44156e1171 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecent.java +++ b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecent.java @@ -79,44 +79,46 @@ public void onNext(T args) { * thread expect {@link Iterator#next()} called from a different thread to work. * @return the Iterator */ - public Iterator getIterable() { - return new Iterator() { - /** - * buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next(). - */ - private Object buf; - - @Override - public boolean hasNext() { - buf = value; - return !NotificationLite.isComplete(buf); - } + public Iterator getIterable() { + return new Iterator(); + } - @Override - public T next() { - try { - // if hasNext wasn't called before calling next. - if (buf == null) { - buf = value; - } - if (NotificationLite.isComplete(buf)) { - throw new NoSuchElementException(); - } - if (NotificationLite.isError(buf)) { - throw ExceptionHelper.wrapOrThrow(NotificationLite.getError(buf)); - } - return NotificationLite.getValue(buf); + final class Iterator implements java.util.Iterator { + /** + * buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next(). + */ + private Object buf; + + @Override + public boolean hasNext() { + buf = value; + return !NotificationLite.isComplete(buf); + } + + @Override + public T next() { + try { + // if hasNext wasn't called before calling next. + if (buf == null) { + buf = value; + } + if (NotificationLite.isComplete(buf)) { + throw new NoSuchElementException(); } - finally { - buf = null; + if (NotificationLite.isError(buf)) { + throw ExceptionHelper.wrapOrThrow(NotificationLite.getError(buf)); } + return NotificationLite.getValue(buf); } - - @Override - public void remove() { - throw new UnsupportedOperationException("Read only iterator"); + finally { + buf = null; } - }; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Read only iterator"); + } } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java index 10f1a2aa34..c93a8e7f57 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java @@ -261,16 +261,7 @@ public void onSubscribe(Disposable s) { w.schedulePeriodically(this, timeskip, timeskip, unit); - w.schedule(new Runnable() { - @Override - public void run() { - synchronized (BufferSkipBoundedObserver.this) { - buffers.remove(b); - } - - fastPathOrderedEmit(b, false, w); - } - }, timespan, unit); + w.schedule(new RemoveFromBufferEmit(b), timespan, unit); } } @@ -352,22 +343,47 @@ public void run() { buffers.add(b); } - w.schedule(new Runnable() { - @Override - public void run() { - synchronized (BufferSkipBoundedObserver.this) { - buffers.remove(b); - } - - fastPathOrderedEmit(b, false, w); - } - }, timespan, unit); + w.schedule(new RemoveFromBuffer(b), timespan, unit); } @Override public void accept(Observer a, U v) { a.onNext(v); } + + final class RemoveFromBuffer implements Runnable { + private final U b; + + RemoveFromBuffer(U b) { + this.b = b; + } + + @Override + public void run() { + synchronized (BufferSkipBoundedObserver.this) { + buffers.remove(b); + } + + fastPathOrderedEmit(b, false, w); + } + } + + final class RemoveFromBufferEmit implements Runnable { + private final U buffer; + + RemoveFromBufferEmit(U buffer) { + this.buffer = buffer; + } + + @Override + public void run() { + synchronized (BufferSkipBoundedObserver.this) { + buffers.remove(buffer); + } + + fastPathOrderedEmit(buffer, false, w); + } + } } static final class BufferExactBoundedObserver> diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDelay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDelay.java index 4470883b96..066d8cef87 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDelay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDelay.java @@ -78,40 +78,17 @@ public void onSubscribe(Disposable s) { @Override public void onNext(final T t) { - w.schedule(new Runnable() { - @Override - public void run() { - actual.onNext(t); - } - }, delay, unit); + w.schedule(new OnNext(t), delay, unit); } @Override public void onError(final Throwable t) { - w.schedule(new Runnable() { - @Override - public void run() { - try { - actual.onError(t); - } finally { - w.dispose(); - } - } - }, delayError ? delay : 0, unit); + w.schedule(new OnError(t), delayError ? delay : 0, unit); } @Override public void onComplete() { - w.schedule(new Runnable() { - @Override - public void run() { - try { - actual.onComplete(); - } finally { - w.dispose(); - } - } - }, delay, unit); + w.schedule(new OnComplete(), delay, unit); } @Override @@ -124,5 +101,46 @@ public void dispose() { public boolean isDisposed() { return w.isDisposed(); } + + final class OnNext implements Runnable { + private final T t; + + OnNext(T t) { + this.t = t; + } + + @Override + public void run() { + actual.onNext(t); + } + } + + final class OnError implements Runnable { + private final Throwable throwable; + + public OnError(Throwable throwable) { + this.throwable = throwable; + } + + @Override + public void run() { + try { + actual.onError(throwable); + } finally { + w.dispose(); + } + } + } + + final class OnComplete implements Runnable { + @Override + public void run() { + try { + actual.onComplete(); + } finally { + w.dispose(); + } + } + } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDelaySubscriptionOther.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDelaySubscriptionOther.java index 0d65cb56ef..b99383581b 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDelaySubscriptionOther.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDelaySubscriptionOther.java @@ -38,59 +38,71 @@ public void subscribeActual(final Observer child) { final SequentialDisposable serial = new SequentialDisposable(); child.onSubscribe(serial); - Observer otherObserver = new Observer() { - boolean done; + Observer otherObserver = new DelayObserver(serial, child); + + other.subscribe(otherObserver); + } + + final class DelayObserver implements Observer { + private final SequentialDisposable serial; + private final Observer child; + boolean done; + + DelayObserver(SequentialDisposable serial, Observer child) { + this.serial = serial; + this.child = child; + } + + @Override + public void onSubscribe(Disposable d) { + serial.update(d); + } + + @Override + public void onNext(U t) { + onComplete(); + } + + @Override + public void onError(Throwable e) { + if (done) { + RxJavaPlugins.onError(e); + return; + } + done = true; + child.onError(e); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + + main.subscribe(new OnComplete()); + } + + final class OnComplete implements Observer { @Override public void onSubscribe(Disposable d) { serial.update(d); } @Override - public void onNext(U t) { - onComplete(); + public void onNext(T value) { + child.onNext(value); } @Override public void onError(Throwable e) { - if (done) { - RxJavaPlugins.onError(e); - return; - } - done = true; child.onError(e); } @Override public void onComplete() { - if (done) { - return; - } - done = true; - - main.subscribe(new Observer() { - @Override - public void onSubscribe(Disposable d) { - serial.update(d); - } - - @Override - public void onNext(T value) { - child.onNext(value); - } - - @Override - public void onError(Throwable e) { - child.onError(e); - } - - @Override - public void onComplete() { - child.onComplete(); - } - }); + child.onComplete(); } - }; - - other.subscribe(otherObserver); + } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java index faf8957d9e..135d8bfb80 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java @@ -220,48 +220,23 @@ public static Function>, ObservableSource> re } public static Callable> replayCallable(final Observable parent) { - return new Callable>() { - @Override - public ConnectableObservable call() { - return parent.replay(); - } - }; + return new ReplayCallable(parent); } public static Callable> replayCallable(final Observable parent, final int bufferSize) { - return new Callable>() { - @Override - public ConnectableObservable call() { - return parent.replay(bufferSize); - } - }; + return new BufferedReplayCallable(parent, bufferSize); } public static Callable> replayCallable(final Observable parent, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) { - return new Callable>() { - @Override - public ConnectableObservable call() { - return parent.replay(bufferSize, time, unit, scheduler); - } - }; + return new BufferedTimedReplayCallable(parent, bufferSize, time, unit, scheduler); } public static Callable> replayCallable(final Observable parent, final long time, final TimeUnit unit, final Scheduler scheduler) { - return new Callable>() { - @Override - public ConnectableObservable call() { - return parent.replay(time, unit, scheduler); - } - }; + return new TimedReplayCallable(parent, time, unit, scheduler); } public static Function, ObservableSource> replayFunction(final Function, ? extends ObservableSource> selector, final Scheduler scheduler) { - return new Function, ObservableSource>() { - @Override - public ObservableSource apply(Observable t) throws Exception { - return Observable.wrap(selector.apply(t)).observeOn(scheduler); - } - }; + return new ReplayFunction(selector, scheduler); } enum ErrorMapperFilter implements Function, Throwable>, Predicate> { @@ -349,4 +324,86 @@ public Observable apply(T t) throws Exception { } + static final class ReplayCallable implements Callable> { + private final Observable parent; + + ReplayCallable(Observable parent) { + this.parent = parent; + } + + @Override + public ConnectableObservable call() { + return parent.replay(); + } + } + + static final class BufferedReplayCallable implements Callable> { + private final Observable parent; + private final int bufferSize; + + public BufferedReplayCallable(Observable parent, int bufferSize) { + this.parent = parent; + this.bufferSize = bufferSize; + } + + @Override + public ConnectableObservable call() { + return parent.replay(bufferSize); + } + } + + static final class BufferedTimedReplayCallable implements Callable> { + private final Observable parent; + private final int bufferSize; + private final long time; + private final TimeUnit unit; + private final Scheduler scheduler; + + BufferedTimedReplayCallable(Observable parent, int bufferSize, long time, TimeUnit unit, Scheduler scheduler) { + this.parent = parent; + this.bufferSize = bufferSize; + this.time = time; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public ConnectableObservable call() { + return parent.replay(bufferSize, time, unit, scheduler); + } + } + + static final class TimedReplayCallable implements Callable> { + private final Observable parent; + private final long time; + private final TimeUnit unit; + private final Scheduler scheduler; + + TimedReplayCallable(Observable parent, long time, TimeUnit unit, Scheduler scheduler) { + this.parent = parent; + this.time = time; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public ConnectableObservable call() { + return parent.replay(time, unit, scheduler); + } + } + + static final class ReplayFunction implements Function, ObservableSource> { + private final Function, ? extends ObservableSource> selector; + private final Scheduler scheduler; + + ReplayFunction(Function, ? extends ObservableSource> selector, Scheduler scheduler) { + this.selector = selector; + this.scheduler = scheduler; + } + + @Override + public ObservableSource apply(Observable t) throws Exception { + return Observable.wrap(selector.apply(t)).observeOn(scheduler); + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java b/src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java index e82fedef4c..b8c5206c2b 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java @@ -47,66 +47,7 @@ public final class ObservablePublish extends ConnectableObservable impleme public static ConnectableObservable create(ObservableSource source) { // the current connection to source needs to be shared between the operator and its onSubscribe call final AtomicReference> curr = new AtomicReference>(); - ObservableSource onSubscribe = new ObservableSource() { - @Override - public void subscribe(Observer child) { - // create the backpressure-managing producer for this child - InnerDisposable inner = new InnerDisposable(child); - child.onSubscribe(inner); - // concurrent connection/disconnection may change the state, - // we loop to be atomic while the child subscribes - for (;;) { - // get the current subscriber-to-source - PublishObserver r = curr.get(); - // if there isn't one or it is disposed - if (r == null || r.isDisposed()) { - // create a new subscriber to source - PublishObserver u = new PublishObserver(curr); - // let's try setting it as the current subscriber-to-source - if (!curr.compareAndSet(r, u)) { - // didn't work, maybe someone else did it or the current subscriber - // to source has just finished - continue; - } - // we won, let's use it going onwards - r = u; - } - - /* - * Try adding it to the current subscriber-to-source, add is atomic in respect - * to other adds and the termination of the subscriber-to-source. - */ - if (r.add(inner)) { - inner.setParent(r); - break; // NOPMD - } - /* - * The current PublishObserver has been terminated, try with a newer one. - */ - /* - * Note: although technically correct, concurrent disconnects can cause - * unexpected behavior such as child observers never receiving anything - * (unless connected again). An alternative approach, similar to - * PublishSubject would be to immediately terminate such child - * observers as well: - * - * Object term = r.terminalEvent; - * if (r.nl.isCompleted(term)) { - * child.onComplete(); - * } else { - * child.onError(r.nl.getError(term)); - * } - * return; - * - * The original concurrent behavior was non-deterministic in this regard as well. - * Allowing this behavior, however, may introduce another unexpected behavior: - * after disconnecting a previous connection, one might not be able to prepare - * a new connection right after a previous termination by subscribing new child - * observers asynchronously before a connect call. - */ - } - } - }; + ObservableSource onSubscribe = new PublishSource(curr); return RxJavaPlugins.onAssembly(new ObservablePublish(onSubscribe, source, curr)); } @@ -374,4 +315,71 @@ void setParent(PublishObserver p) { } } } + + static final class PublishSource implements ObservableSource { + private final AtomicReference> curr; + + PublishSource(AtomicReference> curr) { + this.curr = curr; + } + + @Override + public void subscribe(Observer child) { + // create the backpressure-managing producer for this child + InnerDisposable inner = new InnerDisposable(child); + child.onSubscribe(inner); + // concurrent connection/disconnection may change the state, + // we loop to be atomic while the child subscribes + for (;;) { + // get the current subscriber-to-source + PublishObserver r = curr.get(); + // if there isn't one or it is disposed + if (r == null || r.isDisposed()) { + // create a new subscriber to source + PublishObserver u = new PublishObserver(curr); + // let's try setting it as the current subscriber-to-source + if (!curr.compareAndSet(r, u)) { + // didn't work, maybe someone else did it or the current subscriber + // to source has just finished + continue; + } + // we won, let's use it going onwards + r = u; + } + + /* + * Try adding it to the current subscriber-to-source, add is atomic in respect + * to other adds and the termination of the subscriber-to-source. + */ + if (r.add(inner)) { + inner.setParent(r); + break; // NOPMD + } + /* + * The current PublishObserver has been terminated, try with a newer one. + */ + /* + * Note: although technically correct, concurrent disconnects can cause + * unexpected behavior such as child observers never receiving anything + * (unless connected again). An alternative approach, similar to + * PublishSubject would be to immediately terminate such child + * observers as well: + * + * Object term = r.terminalEvent; + * if (r.nl.isCompleted(term)) { + * child.onComplete(); + * } else { + * child.onError(r.nl.getError(term)); + * } + * return; + * + * The original concurrent behavior was non-deterministic in this regard as well. + * Allowing this behavior, however, may introduce another unexpected behavior: + * after disconnecting a previous connection, one might not be able to prepare + * a new connection right after a previous termination by subscribing new child + * observers asynchronously before a connect call. + */ + } + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java index 08a9e6084a..207bf96ff7 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java @@ -89,20 +89,7 @@ public void subscribeActual(final Observer subscriber) { private Consumer onSubscribe(final Observer observer, final AtomicBoolean writeLocked) { - return new Consumer() { - @Override - public void accept(Disposable subscription) { - try { - baseDisposable.add(subscription); - // ready to subscribe to source so do it - doSubscribe(observer, baseDisposable); - } finally { - // release the write lock - lock.unlock(); - writeLocked.set(false); - } - } - }; + return new DisposeConsumer(observer, writeLocked); } void doSubscribe(final Observer observer, final CompositeDisposable currentBase) { @@ -116,24 +103,7 @@ void doSubscribe(final Observer observer, final CompositeDisposable c } private Disposable disconnect(final CompositeDisposable current) { - return Disposables.fromRunnable(new Runnable() { - @Override - public void run() { - lock.lock(); - try { - if (baseDisposable == current) { - if (subscriptionCount.decrementAndGet() == 0) { - baseDisposable.dispose(); - // need a new baseDisposable because once - // disposed stays that way - baseDisposable = new CompositeDisposable(); - } - } - } finally { - lock.unlock(); - } - } - }); + return Disposables.fromRunnable(new DisposeTask(current)); } final class ConnectionObserver @@ -202,4 +172,51 @@ void cleanup() { } } + final class DisposeConsumer implements Consumer { + private final Observer observer; + private final AtomicBoolean writeLocked; + + DisposeConsumer(Observer observer, AtomicBoolean writeLocked) { + this.observer = observer; + this.writeLocked = writeLocked; + } + + @Override + public void accept(Disposable subscription) { + try { + baseDisposable.add(subscription); + // ready to subscribe to source so do it + doSubscribe(observer, baseDisposable); + } finally { + // release the write lock + lock.unlock(); + writeLocked.set(false); + } + } + } + + final class DisposeTask implements Runnable { + private final CompositeDisposable current; + + DisposeTask(CompositeDisposable current) { + this.current = current; + } + + @Override + public void run() { + lock.lock(); + try { + if (baseDisposable == current) { + if (subscriptionCount.decrementAndGet() == 0) { + baseDisposable.dispose(); + // need a new baseDisposable because once + // disposed stays that way + baseDisposable = new CompositeDisposable(); + } + } + } finally { + lock.unlock(); + } + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java index 1234bc0f7e..e2f5fe145d 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java @@ -45,12 +45,7 @@ interface BufferSupplier { } @SuppressWarnings("rawtypes") - static final BufferSupplier DEFAULT_UNBOUNDED_FACTORY = new BufferSupplier() { - @Override - public ReplayBuffer call() { - return new UnboundedReplayBuffer(16); - } - }; + static final BufferSupplier DEFAULT_UNBOUNDED_FACTORY = new UnBoundedFactory(); /** * Given a connectable observable factory, it multicasts over the generated @@ -64,32 +59,7 @@ public ReplayBuffer call() { public static Observable multicastSelector( final Callable> connectableFactory, final Function, ? extends ObservableSource> selector) { - return RxJavaPlugins.onAssembly(new Observable() { - @Override - protected void subscribeActual(Observer child) { - ConnectableObservable co; - ObservableSource observable; - try { - co = connectableFactory.call(); - observable = selector.apply(co); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - EmptyDisposable.error(e, child); - return; - } - - final ObserverResourceWrapper srw = new ObserverResourceWrapper(child); - - observable.subscribe(srw); - - co.connect(new Consumer() { - @Override - public void accept(Disposable r) { - srw.setResource(r); - } - }); - } - }); + return RxJavaPlugins.onAssembly(new MulticastReplay(connectableFactory, selector)); } /** @@ -102,17 +72,7 @@ public void accept(Disposable r) { */ public static ConnectableObservable observeOn(final ConnectableObservable co, final Scheduler scheduler) { final Observable observable = co.observeOn(scheduler); - return RxJavaPlugins.onAssembly(new ConnectableObservable() { - @Override - public void connect(Consumer connection) { - co.connect(connection); - } - - @Override - protected void subscribeActual(Observer observer) { - observable.subscribe(observer); - } - }); + return RxJavaPlugins.onAssembly(new Replay(co, observable)); } /** @@ -138,12 +98,7 @@ public static ConnectableObservable create(ObservableSource source, if (bufferSize == Integer.MAX_VALUE) { return createFrom(source); } - return create(source, new BufferSupplier() { - @Override - public ReplayBuffer call() { - return new SizeBoundReplayBuffer(bufferSize); - } - }); + return create(source, new ReplayBufferSupplier(bufferSize)); } /** @@ -172,12 +127,7 @@ public static ConnectableObservable create(ObservableSource source, */ public static ConnectableObservable create(ObservableSource source, final long maxAge, final TimeUnit unit, final Scheduler scheduler, final int bufferSize) { - return create(source, new BufferSupplier() { - @Override - public ReplayBuffer call() { - return new SizeAndTimeBoundReplayBuffer(bufferSize, maxAge, unit, scheduler); - } - }); + return create(source, new ScheduledReplaySupplier(bufferSize, maxAge, unit, scheduler)); } /** @@ -190,54 +140,7 @@ static ConnectableObservable create(ObservableSource source, final BufferSupplier bufferFactory) { // the current connection to source needs to be shared between the operator and its onSubscribe call final AtomicReference> curr = new AtomicReference>(); - ObservableSource onSubscribe = new ObservableSource() { - @Override - public void subscribe(Observer child) { - // concurrent connection/disconnection may change the state, - // we loop to be atomic while the child subscribes - for (;;) { - // get the current subscriber-to-source - ReplayObserver r = curr.get(); - // if there isn't one - if (r == null) { - // create a new subscriber to source - ReplayBuffer buf = bufferFactory.call(); - - ReplayObserver u = new ReplayObserver(buf); - // let's try setting it as the current subscriber-to-source - if (!curr.compareAndSet(null, u)) { - // didn't work, maybe someone else did it or the current subscriber - // to source has just finished - continue; - } - // we won, let's use it going onwards - r = u; - } - - // create the backpressure-managing producer for this child - InnerDisposable inner = new InnerDisposable(r, child); - // the producer has been registered with the current subscriber-to-source so - // at least it will receive the next terminal event - // setting the producer will trigger the first request to be considered by - // the subscriber-to-source. - child.onSubscribe(inner); - // we try to add it to the array of observers - // if it fails, no worries because we will still have its buffer - // so it is going to replay it for us - r.add(inner); - - if (inner.isDisposed()) { - r.remove(inner); - return; - } - - // replay the contents of the buffer - r.buffer.replay(inner); - - break; // NOPMD - } - } - }; + ObservableSource onSubscribe = new ReplaySource(curr, bufferFactory); return RxJavaPlugins.onAssembly(new ObservableReplay(onSubscribe, source, curr, bufferFactory)); } @@ -988,4 +891,163 @@ Node getHead() { return prev; } } + + static final class UnBoundedFactory implements BufferSupplier { + @Override + public ReplayBuffer call() { + return new UnboundedReplayBuffer(16); + } + } + + static final class DisposeConsumer implements Consumer { + private final ObserverResourceWrapper srw; + + DisposeConsumer(ObserverResourceWrapper srw) { + this.srw = srw; + } + + @Override + public void accept(Disposable r) { + srw.setResource(r); + } + } + + static final class ReplayBufferSupplier implements BufferSupplier { + private final int bufferSize; + + ReplayBufferSupplier(int bufferSize) { + this.bufferSize = bufferSize; + } + + @Override + public ReplayBuffer call() { + return new SizeBoundReplayBuffer(bufferSize); + } + } + + static final class ScheduledReplaySupplier implements BufferSupplier { + private final int bufferSize; + private final long maxAge; + private final TimeUnit unit; + private final Scheduler scheduler; + + ScheduledReplaySupplier(int bufferSize, long maxAge, TimeUnit unit, Scheduler scheduler) { + this.bufferSize = bufferSize; + this.maxAge = maxAge; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public ReplayBuffer call() { + return new SizeAndTimeBoundReplayBuffer(bufferSize, maxAge, unit, scheduler); + } + } + + static final class ReplaySource implements ObservableSource { + private final AtomicReference> curr; + private final BufferSupplier bufferFactory; + + ReplaySource(AtomicReference> curr, BufferSupplier bufferFactory) { + this.curr = curr; + this.bufferFactory = bufferFactory; + } + + @Override + public void subscribe(Observer child) { + // concurrent connection/disconnection may change the state, + // we loop to be atomic while the child subscribes + for (;;) { + // get the current subscriber-to-source + ReplayObserver r = curr.get(); + // if there isn't one + if (r == null) { + // create a new subscriber to source + ReplayBuffer buf = bufferFactory.call(); + + ReplayObserver u = new ReplayObserver(buf); + // let's try setting it as the current subscriber-to-source + if (!curr.compareAndSet(null, u)) { + // didn't work, maybe someone else did it or the current subscriber + // to source has just finished + continue; + } + // we won, let's use it going onwards + r = u; + } + + // create the backpressure-managing producer for this child + InnerDisposable inner = new InnerDisposable(r, child); + // the producer has been registered with the current subscriber-to-source so + // at least it will receive the next terminal event + // setting the producer will trigger the first request to be considered by + // the subscriber-to-source. + child.onSubscribe(inner); + // we try to add it to the array of observers + // if it fails, no worries because we will still have its buffer + // so it is going to replay it for us + r.add(inner); + + if (inner.isDisposed()) { + r.remove(inner); + return; + } + + // replay the contents of the buffer + r.buffer.replay(inner); + + break; // NOPMD + } + } + } + + static final class MulticastReplay extends Observable { + private final Callable> connectableFactory; + private final Function, ? extends ObservableSource> selector; + + MulticastReplay(Callable> connectableFactory, Function, ? extends ObservableSource> selector) { + this.connectableFactory = connectableFactory; + this.selector = selector; + } + + @Override + protected void subscribeActual(Observer child) { + ConnectableObservable co; + ObservableSource observable; + try { + co = connectableFactory.call(); + observable = selector.apply(co); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + EmptyDisposable.error(e, child); + return; + } + + final ObserverResourceWrapper srw = new ObserverResourceWrapper(child); + + observable.subscribe(srw); + + co.connect(new DisposeConsumer(srw)); + } + } + + static final class Replay extends ConnectableObservable { + private final ConnectableObservable co; + private final Observable observable; + + Replay(ConnectableObservable co, Observable observable) { + this.co = co; + this.observable = observable; + } + + @Override + public void connect(Consumer connection) { + co.connect(connection); + } + + @Override + protected void subscribeActual(Observer observer) { + observable.subscribe(observer); + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableSkipUntil.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableSkipUntil.java index 7cff8acc7c..c9f711ff3e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableSkipUntil.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableSkipUntil.java @@ -36,33 +36,7 @@ public void subscribeActual(Observer child) { final SkipUntilObserver sus = new SkipUntilObserver(serial, frc); - other.subscribe(new Observer() { - Disposable s; - @Override - public void onSubscribe(Disposable s) { - if (DisposableHelper.validate(this.s, s)) { - this.s = s; - frc.setResource(1, s); - } - } - - @Override - public void onNext(U t) { - s.dispose(); - sus.notSkipping = true; - } - - @Override - public void onError(Throwable t) { - frc.dispose(); - serial.onError(t); - } - - @Override - public void onComplete() { - sus.notSkipping = true; - } - }); + other.subscribe(new SkipUntil(frc, sus, serial)); source.subscribe(sus); } @@ -113,4 +87,42 @@ public void onComplete() { actual.onComplete(); } } + + final class SkipUntil implements Observer { + private final ArrayCompositeDisposable frc; + private final SkipUntilObserver sus; + private final SerializedObserver serial; + Disposable s; + + SkipUntil(ArrayCompositeDisposable frc, SkipUntilObserver sus, SerializedObserver serial) { + this.frc = frc; + this.sus = sus; + this.serial = serial; + } + + @Override + public void onSubscribe(Disposable s) { + if (DisposableHelper.validate(this.s, s)) { + this.s = s; + frc.setResource(1, s); + } + } + + @Override + public void onNext(U t) { + s.dispose(); + sus.notSkipping = true; + } + + @Override + public void onError(Throwable t) { + frc.dispose(); + serial.onError(t); + } + + @Override + public void onComplete() { + sus.notSkipping = true; + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableSubscribeOn.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableSubscribeOn.java index 57006fe7cc..57f4666b52 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableSubscribeOn.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableSubscribeOn.java @@ -33,12 +33,7 @@ public void subscribeActual(final Observer s) { s.onSubscribe(parent); - parent.setDisposable(scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - source.subscribe(parent); - } - })); + parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } static final class SubscribeOnObserver extends AtomicReference implements Observer, Disposable { @@ -88,4 +83,17 @@ void setDisposable(Disposable d) { DisposableHelper.setOnce(this, d); } } + + final class SubscribeTask implements Runnable { + private final SubscribeOnObserver parent; + + SubscribeTask(SubscribeOnObserver parent) { + this.parent = parent; + } + + @Override + public void run() { + source.subscribe(parent); + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeUntil.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeUntil.java index f1ec17ad37..64639abc0a 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeUntil.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeUntil.java @@ -36,27 +36,7 @@ public void subscribeActual(Observer child) { child.onSubscribe(frc); - other.subscribe(new Observer() { - @Override - public void onSubscribe(Disposable s) { - frc.setResource(1, s); - } - @Override - public void onNext(U t) { - frc.dispose(); - serial.onComplete(); - } - @Override - public void onError(Throwable t) { - frc.dispose(); - serial.onError(t); - } - @Override - public void onComplete() { - frc.dispose(); - serial.onComplete(); - } - }); + other.subscribe(new TakeUntil(frc, serial)); source.subscribe(tus); } @@ -99,4 +79,37 @@ public void onComplete() { actual.onComplete(); } } + + final class TakeUntil implements Observer { + private final ArrayCompositeDisposable frc; + private final SerializedObserver serial; + + TakeUntil(ArrayCompositeDisposable frc, SerializedObserver serial) { + this.frc = frc; + this.serial = serial; + } + + @Override + public void onSubscribe(Disposable s) { + frc.setResource(1, s); + } + + @Override + public void onNext(U t) { + frc.dispose(); + serial.onComplete(); + } + + @Override + public void onError(Throwable t) { + frc.dispose(); + serial.onError(t); + } + + @Override + public void onComplete() { + frc.dispose(); + serial.onComplete(); + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java index 854b40a295..0c152df34d 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java @@ -31,15 +31,7 @@ public final class ObservableTimeoutTimed extends AbstractObservableWithUpstr final ObservableSource other; - static final Disposable NEW_TIMER = new Disposable() { - @Override - public void dispose() { } - - @Override - public boolean isDisposed() { - return true; - } - }; + static final Disposable NEW_TIMER = new EmptyDisposable(); public ObservableTimeoutTimed(ObservableSource source, long timeout, TimeUnit unit, Scheduler scheduler, ObservableSource other) { @@ -124,20 +116,7 @@ void scheduleTimeout(final long idx) { } if (compareAndSet(d, NEW_TIMER)) { - d = worker.schedule(new Runnable() { - @Override - public void run() { - if (idx == index) { - done = true; - s.dispose(); - DisposableHelper.dispose(TimeoutTimedOtherObserver.this); - - subscribeNext(); - - worker.dispose(); - } - } - }, timeout, unit); + d = worker.schedule(new SubscribeNext(idx), timeout, unit); DisposableHelper.replace(this, d); } @@ -178,6 +157,27 @@ public void dispose() { public boolean isDisposed() { return worker.isDisposed(); } + + final class SubscribeNext implements Runnable { + private final long idx; + + SubscribeNext(long idx) { + this.idx = idx; + } + + @Override + public void run() { + if (idx == index) { + done = true; + s.dispose(); + DisposableHelper.dispose(TimeoutTimedOtherObserver.this); + + subscribeNext(); + + worker.dispose(); + } + } + } } static final class TimeoutTimedObserver @@ -233,20 +233,7 @@ void scheduleTimeout(final long idx) { } if (compareAndSet(d, NEW_TIMER)) { - d = worker.schedule(new Runnable() { - @Override - public void run() { - if (idx == index) { - done = true; - s.dispose(); - DisposableHelper.dispose(TimeoutTimedObserver.this); - - actual.onError(new TimeoutException()); - - worker.dispose(); - } - } - }, timeout, unit); + d = worker.schedule(new TimeoutTask(idx), timeout, unit); DisposableHelper.replace(this, d); } @@ -285,5 +272,36 @@ public void dispose() { public boolean isDisposed() { return worker.isDisposed(); } + + final class TimeoutTask implements Runnable { + private final long idx; + + TimeoutTask(long idx) { + this.idx = idx; + } + + @Override + public void run() { + if (idx == index) { + done = true; + s.dispose(); + DisposableHelper.dispose(TimeoutTimedObserver.this); + + actual.onError(new TimeoutException()); + + worker.dispose(); + } + } + } + } + + static final class EmptyDisposable implements Disposable { + @Override + public void dispose() { } + + @Override + public boolean isDisposed() { + return true; + } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOn.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOn.java index 4554a3578c..bfcc33308e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOn.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOn.java @@ -80,12 +80,7 @@ public void onComplete() { @Override public void dispose() { if (compareAndSet(false, true)) { - scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - s.dispose(); - } - }); + scheduler.scheduleDirect(new DisposeTask()); } } @@ -93,5 +88,12 @@ public void run() { public boolean isDisposed() { return get(); } + + final class DisposeTask implements Runnable { + @Override + public void run() { + s.dispose(); + } + } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java index 5edc60688e..64295aa3ae 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java @@ -552,12 +552,7 @@ public void onSubscribe(Disposable s) { windows.add(w); actual.onNext(w); - worker.schedule(new Runnable() { - @Override - public void run() { - complete(w); - } - }, timespan, unit); + worker.schedule(new CompletionTask(w), timespan, unit); worker.schedulePeriodically(this, timeskip, timeskip, unit); } @@ -685,12 +680,7 @@ void drainLoop() { ws.add(w); a.onNext(w); - worker.schedule(new Runnable() { - @Override - public void run() { - complete(w); - } - }, timespan, unit); + worker.schedule(new CompletionTask(w), timespan, unit); } else { ws.remove(work.w); work.w.onComplete(); @@ -735,6 +725,19 @@ static final class SubjectWork { this.open = open; } } + + final class CompletionTask implements Runnable { + private final UnicastSubject w; + + CompletionTask(UnicastSubject w) { + this.w = w; + } + + @Override + public void run() { + complete(w); + } + } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFrom.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFrom.java index 9ae50dc453..d60a086687 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFrom.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFrom.java @@ -40,27 +40,7 @@ public void subscribeActual(Observer t) { serial.onSubscribe(wlf); - other.subscribe(new Observer() { - @Override - public void onSubscribe(Disposable s) { - wlf.setOther(s); - } - - @Override - public void onNext(U t) { - wlf.lazySet(t); - } - - @Override - public void onError(Throwable t) { - wlf.otherError(t); - } - - @Override - public void onComplete() { - // nothing to do, the wlf will complete on its own pace - } - }); + other.subscribe(new WithLastFrom(wlf)); source.subscribe(wlf); } @@ -135,4 +115,32 @@ public void otherError(Throwable e) { actual.onError(e); } } + + final class WithLastFrom implements Observer { + private final WithLatestFromObserver wlf; + + WithLastFrom(WithLatestFromObserver wlf) { + this.wlf = wlf; + } + + @Override + public void onSubscribe(Disposable s) { + wlf.setOther(s); + } + + @Override + public void onNext(U t) { + wlf.lazySet(t); + } + + @Override + public void onError(Throwable t) { + wlf.otherError(t); + } + + @Override + public void onComplete() { + // nothing to do, the wlf will complete on its own pace + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java index dd58b0123f..5b6360741a 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java @@ -83,12 +83,7 @@ protected void subscribeActual(Observer s) { } if (n == 0) { - new ObservableMap(source, new Function() { - @Override - public R apply(T t) throws Exception { - return combiner.apply(new Object[] { t }); - } - }).subscribeActual(s); + new ObservableMap(source, new SingletonArrayFunc()).subscribeActual(s); return; } @@ -287,4 +282,11 @@ public void dispose() { DisposableHelper.dispose(this); } } + + final class SingletonArrayFunc implements Function { + @Override + public R apply(T t) throws Exception { + return combiner.apply(new Object[] { t }); + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleContains.java b/src/main/java/io/reactivex/internal/operators/single/SingleContains.java index e0dceebad2..b57d8eefbd 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleContains.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleContains.java @@ -35,33 +35,40 @@ public SingleContains(SingleSource source, Object value, BiPredicate s) { - source.subscribe(new SingleObserver() { + source.subscribe(new Single(s)); + } - @Override - public void onSubscribe(Disposable d) { - s.onSubscribe(d); - } + final class Single implements SingleObserver { - @Override - public void onSuccess(T v) { - boolean b; - - try { - b = comparer.test(v, value); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - s.onError(ex); - return; - } - s.onSuccess(b); - } + private final SingleObserver s; + + Single(SingleObserver s) { + this.s = s; + } - @Override - public void onError(Throwable e) { - s.onError(e); + @Override + public void onSubscribe(Disposable d) { + s.onSubscribe(d); + } + + @Override + public void onSuccess(T v) { + boolean b; + + try { + b = comparer.test(v, value); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + s.onError(ex); + return; } + s.onSuccess(b); + } - }); - } + @Override + public void onError(Throwable e) { + s.onError(e); + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDelay.java b/src/main/java/io/reactivex/internal/operators/single/SingleDelay.java index eba6af999b..e83bb06320 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleDelay.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDelay.java @@ -39,33 +39,57 @@ protected void subscribeActual(final SingleObserver s) { final SequentialDisposable sd = new SequentialDisposable(); s.onSubscribe(sd); - source.subscribe(new SingleObserver() { - @Override - public void onSubscribe(Disposable d) { - sd.replace(d); + source.subscribe(new Delay(sd, s)); + } + + final class Delay implements SingleObserver { + private final SequentialDisposable sd; + private final SingleObserver s; + + Delay(SequentialDisposable sd, SingleObserver s) { + this.sd = sd; + this.s = s; + } + + @Override + public void onSubscribe(Disposable d) { + sd.replace(d); + } + + @Override + public void onSuccess(final T value) { + sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit)); + } + + @Override + public void onError(final Throwable e) { + sd.replace(scheduler.scheduleDirect(new OnError(e), 0, unit)); + } + + final class OnSuccess implements Runnable { + private final T value; + + OnSuccess(T value) { + this.value = value; } @Override - public void onSuccess(final T value) { - sd.replace(scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - s.onSuccess(value); - } - }, time, unit)); + public void run() { + s.onSuccess(value); } + } - @Override - public void onError(final Throwable e) { - sd.replace(scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - s.onError(e); - } - }, 0, unit)); + final class OnError implements Runnable { + private final Throwable e; + + public OnError(Throwable e) { + this.e = e; } - }); + @Override + public void run() { + s.onError(e); + } + } } - } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDoOnError.java b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnError.java index 9f1197375c..bd7bcd0d68 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleDoOnError.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnError.java @@ -32,29 +32,36 @@ public SingleDoOnError(SingleSource source, Consumer onErr @Override protected void subscribeActual(final SingleObserver s) { - source.subscribe(new SingleObserver() { - @Override - public void onSubscribe(Disposable d) { - s.onSubscribe(d); - } + source.subscribe(new DoOnError(s)); + } - @Override - public void onSuccess(T value) { - s.onSuccess(value); - } + final class DoOnError implements SingleObserver { + private final SingleObserver s; + + DoOnError(SingleObserver s) { + this.s = s; + } - @Override - public void onError(Throwable e) { - try { - onError.accept(e); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - e = new CompositeException(e, ex); - } - s.onError(e); + @Override + public void onSubscribe(Disposable d) { + s.onSubscribe(d); + } + + @Override + public void onSuccess(T value) { + s.onSuccess(value); + } + + @Override + public void onError(Throwable e) { + try { + onError.accept(e); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + e = new CompositeException(e, ex); } + s.onError(e); + } - }); } - } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDoOnEvent.java b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnEvent.java index 079a45bb62..c262b51ff6 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleDoOnEvent.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnEvent.java @@ -34,35 +34,43 @@ public SingleDoOnEvent(SingleSource source, BiConsumer s) { - source.subscribe(new SingleObserver() { - @Override - public void onSubscribe(Disposable d) { - s.onSubscribe(d); - } + source.subscribe(new DoOnEvent(s)); + } + + final class DoOnEvent implements SingleObserver { + private final SingleObserver s; - @Override - public void onSuccess(T value) { - try { - onEvent.accept(value, null); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - s.onError(ex); - return; - } + DoOnEvent(SingleObserver s) { + this.s = s; + } - s.onSuccess(value); + @Override + public void onSubscribe(Disposable d) { + s.onSubscribe(d); + } + + @Override + public void onSuccess(T value) { + try { + onEvent.accept(value, null); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + s.onError(ex); + return; } - @Override - public void onError(Throwable e) { - try { - onEvent.accept(null, e); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - e = new CompositeException(e, ex); - } - s.onError(e); + s.onSuccess(value); + } + + @Override + public void onError(Throwable e) { + try { + onEvent.accept(null, e); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + e = new CompositeException(e, ex); } - }); + s.onError(e); + } } } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDoOnSuccess.java b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnSuccess.java index 85c7d715ca..34ee281777 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleDoOnSuccess.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnSuccess.java @@ -32,30 +32,37 @@ public SingleDoOnSuccess(SingleSource source, Consumer onSuccess) @Override protected void subscribeActual(final SingleObserver s) { - source.subscribe(new SingleObserver() { - @Override - public void onSubscribe(Disposable d) { - s.onSubscribe(d); - } + source.subscribe(new DoOnSuccess(s)); + } - @Override - public void onSuccess(T value) { - try { - onSuccess.accept(value); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - s.onError(ex); - return; - } - s.onSuccess(value); - } + final class DoOnSuccess implements SingleObserver { + private final SingleObserver s; + + DoOnSuccess(SingleObserver s) { + this.s = s; + } - @Override - public void onError(Throwable e) { - s.onError(e); + @Override + public void onSubscribe(Disposable d) { + s.onSubscribe(d); + } + + @Override + public void onSuccess(T value) { + try { + onSuccess.accept(value); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + s.onError(ex); + return; } + s.onSuccess(value); + } - }); - } + @Override + public void onError(Throwable e) { + s.onError(e); + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleOnErrorReturn.java b/src/main/java/io/reactivex/internal/operators/single/SingleOnErrorReturn.java index 7261f3faa4..d7f9c4c715 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleOnErrorReturn.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleOnErrorReturn.java @@ -37,45 +37,52 @@ public SingleOnErrorReturn(SingleSource source, @Override protected void subscribeActual(final SingleObserver s) { - source.subscribe(new SingleObserver() { - - @Override - public void onError(Throwable e) { - T v; - - if (valueSupplier != null) { - try { - v = valueSupplier.apply(e); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - s.onError(new CompositeException(e, ex)); - return; - } - } else { - v = value; - } + source.subscribe(new OnErrorReturn(s)); + } + + final class OnErrorReturn implements SingleObserver { + + private final SingleObserver observer; + + OnErrorReturn(SingleObserver observer) { + this.observer = observer; + } + + @Override + public void onError(Throwable e) { + T v; - if (v == null) { - NullPointerException npe = new NullPointerException("Value supplied was null"); - npe.initCause(e); - s.onError(npe); + if (valueSupplier != null) { + try { + v = valueSupplier.apply(e); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + observer.onError(new CompositeException(e, ex)); return; } - - s.onSuccess(v); + } else { + v = value; } - @Override - public void onSubscribe(Disposable d) { - s.onSubscribe(d); + if (v == null) { + NullPointerException npe = new NullPointerException("Value supplied was null"); + npe.initCause(e); + observer.onError(npe); + return; } - @Override - public void onSuccess(T value) { - s.onSuccess(value); - } + observer.onSuccess(v); + } - }); - } + @Override + public void onSubscribe(Disposable d) { + observer.onSubscribe(d); + } + @Override + public void onSuccess(T value) { + observer.onSuccess(value); + } + + } } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleTimeout.java b/src/main/java/io/reactivex/internal/operators/single/SingleTimeout.java index 75a9794e02..9156c39625 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleTimeout.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleTimeout.java @@ -48,51 +48,45 @@ protected void subscribeActual(final SingleObserver s) { final AtomicBoolean once = new AtomicBoolean(); - Disposable timer = scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - if (once.compareAndSet(false, true)) { - if (other != null) { - set.clear(); - other.subscribe(new SingleObserver() { - - @Override - public void onError(Throwable e) { - set.dispose(); - s.onError(e); - } - - @Override - public void onSubscribe(Disposable d) { - set.add(d); - } - - @Override - public void onSuccess(T value) { - set.dispose(); - s.onSuccess(value); - } - - }); - } else { - set.dispose(); - s.onError(new TimeoutException()); - } - } - } - }, timeout, unit); + Disposable timer = scheduler.scheduleDirect(new TimeoutDispose(once, set, s), timeout, unit); set.add(timer); - source.subscribe(new SingleObserver() { + source.subscribe(new TimeoutObserver(once, set, s)); - @Override - public void onError(Throwable e) { - if (once.compareAndSet(false, true)) { + } + + final class TimeoutDispose implements Runnable { + private final AtomicBoolean once; + private final CompositeDisposable set; + private final SingleObserver s; + + TimeoutDispose(AtomicBoolean once, CompositeDisposable set, SingleObserver s) { + this.once = once; + this.set = set; + this.s = s; + } + + @Override + public void run() { + if (once.compareAndSet(false, true)) { + if (other != null) { + set.clear(); + other.subscribe(new TimeoutObserver()); + } else { set.dispose(); - s.onError(e); + s.onError(new TimeoutException()); } } + } + + final class TimeoutObserver implements SingleObserver { + + @Override + public void onError(Throwable e) { + set.dispose(); + s.onError(e); + } @Override public void onSubscribe(Disposable d) { @@ -101,14 +95,45 @@ public void onSubscribe(Disposable d) { @Override public void onSuccess(T value) { - if (once.compareAndSet(false, true)) { - set.dispose(); - s.onSuccess(value); - } + set.dispose(); + s.onSuccess(value); } - }); - + } } + final class TimeoutObserver implements SingleObserver { + + private final AtomicBoolean once; + private final CompositeDisposable set; + private final SingleObserver s; + + TimeoutObserver(AtomicBoolean once, CompositeDisposable set, SingleObserver s) { + this.once = once; + this.set = set; + this.s = s; + } + + @Override + public void onError(Throwable e) { + if (once.compareAndSet(false, true)) { + set.dispose(); + s.onError(e); + } + } + + @Override + public void onSubscribe(Disposable d) { + set.add(d); + } + + @Override + public void onSuccess(T value) { + if (once.compareAndSet(false, true)) { + set.dispose(); + s.onSuccess(value); + } + } + + } } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleZipArray.java b/src/main/java/io/reactivex/internal/operators/single/SingleZipArray.java index 2c02210637..8f4e867db4 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleZipArray.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleZipArray.java @@ -41,12 +41,7 @@ protected void subscribeActual(SingleObserver observer) { if (n == 1) { - sources[0].subscribe(new SingleMap.MapSingleObserver(observer, new Function() { - @Override - public R apply(T t) throws Exception { - return zipper.apply(new Object[] { t }); - } - })); + sources[0].subscribe(new SingleMap.MapSingleObserver(observer, new SingletonArrayFunc())); return; } @@ -182,4 +177,11 @@ public void onError(Throwable e) { parent.innerError(e, index); } } + + final class SingletonArrayFunc implements Function { + @Override + public R apply(T t) throws Exception { + return zipper.apply(new Object[] { t }); + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleZipIterable.java b/src/main/java/io/reactivex/internal/operators/single/SingleZipIterable.java index 3ab5f7ec3f..5b5f711aeb 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleZipIterable.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleZipIterable.java @@ -61,12 +61,7 @@ protected void subscribeActual(SingleObserver observer) { } if (n == 1) { - a[0].subscribe(new SingleMap.MapSingleObserver(observer, new Function() { - @Override - public R apply(T t) throws Exception { - return zipper.apply(new Object[] { t }); - } - })); + a[0].subscribe(new SingleMap.MapSingleObserver(observer, new SingletonArrayFunc())); return; } @@ -83,4 +78,10 @@ public R apply(T t) throws Exception { } } + final class SingletonArrayFunc implements Function { + @Override + public R apply(T t) throws Exception { + return zipper.apply(new Object[] { t }); + } + } }