diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java index d843d4e0f7..319cd75e8d 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java @@ -104,9 +104,6 @@ public void onSubscribe(Disposable d) { } return; } - if (once.get() || set.isDisposed()) { - return; - } // no need to have separate subscribers because inner is stateless c.subscribe(inner); diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableConcat.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableConcat.java index baf3b3afc9..ae6a68aa2c 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableConcat.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableConcat.java @@ -19,9 +19,10 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.MissingBackpressureException; -import io.reactivex.internal.disposables.SequentialDisposable; -import io.reactivex.internal.queue.SpscArrayQueue; +import io.reactivex.exceptions.*; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.fuseable.*; +import io.reactivex.internal.queue.*; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.plugins.RxJavaPlugins; @@ -36,133 +37,218 @@ public CompletableConcat(Publisher sources, int pre @Override public void subscribeActual(CompletableObserver s) { - CompletableConcatSubscriber parent = new CompletableConcatSubscriber(s, prefetch); - sources.subscribe(parent); + sources.subscribe(new CompletableConcatSubscriber(s, prefetch)); } static final class CompletableConcatSubscriber extends AtomicInteger implements Subscriber, Disposable { - private static final long serialVersionUID = 7412667182931235013L; + private static final long serialVersionUID = 9032184911934499404L; + final CompletableObserver actual; + final int prefetch; - final SequentialDisposable sd; - final SpscArrayQueue queue; + final int limit; + + final ConcatInnerObserver inner; + + final AtomicBoolean once; + + int sourceFused; + + int consumed; + + SimpleQueue queue; Subscription s; volatile boolean done; - final AtomicBoolean once = new AtomicBoolean(); - - final ConcatInnerObserver inner; + volatile boolean active; CompletableConcatSubscriber(CompletableObserver actual, int prefetch) { this.actual = actual; this.prefetch = prefetch; - this.queue = new SpscArrayQueue(prefetch); - this.sd = new SequentialDisposable(); - this.inner = new ConcatInnerObserver(); + this.inner = new ConcatInnerObserver(this); + this.once = new AtomicBoolean(); + this.limit = prefetch - (prefetch >> 2); } @Override public void onSubscribe(Subscription s) { if (SubscriptionHelper.validate(this.s, s)) { this.s = s; + + long r = prefetch == Integer.MAX_VALUE ? Long.MAX_VALUE : prefetch; + + if (s instanceof QueueSubscription) { + @SuppressWarnings("unchecked") + QueueSubscription qs = (QueueSubscription) s; + + int m = qs.requestFusion(QueueSubscription.ANY); + + if (m == QueueSubscription.SYNC) { + sourceFused = m; + queue = qs; + done = true; + actual.onSubscribe(this); + drain(); + return; + } + if (m == QueueSubscription.ASYNC) { + sourceFused = m; + queue = qs; + actual.onSubscribe(this); + s.request(r); + return; + } + } + + if (prefetch == Integer.MAX_VALUE) { + queue = new SpscLinkedArrayQueue(Flowable.bufferSize()); + } else { + queue = new SpscArrayQueue(prefetch); + } + actual.onSubscribe(this); - s.request(prefetch); + + s.request(r); } } @Override public void onNext(CompletableSource t) { - if (!queue.offer(t)) { - onError(new MissingBackpressureException()); - return; - } - if (getAndIncrement() == 0) { - next(); + if (sourceFused == QueueSubscription.NONE) { + if (!queue.offer(t)) { + onError(new MissingBackpressureException()); + return; + } } + drain(); } @Override public void onError(Throwable t) { if (once.compareAndSet(false, true)) { + DisposableHelper.dispose(inner); actual.onError(t); - return; + } else { + RxJavaPlugins.onError(t); } - done = true; - RxJavaPlugins.onError(t); } @Override public void onComplete() { - if (done) { - return; - } done = true; - if (getAndIncrement() == 0) { - next(); - } - } - - void innerError(Throwable e) { - s.cancel(); - onError(e); - } - - void innerComplete() { - if (decrementAndGet() != 0) { - next(); - } - if (!done) { - s.request(1); - } + drain(); } @Override public void dispose() { s.cancel(); - sd.dispose(); + DisposableHelper.dispose(inner); } @Override public boolean isDisposed() { - return sd.isDisposed(); + return DisposableHelper.isDisposed(inner.get()); } - void next() { - boolean d = done; - CompletableSource c = queue.poll(); - if (c == null) { - if (d) { - if (once.compareAndSet(false, true)) { - actual.onComplete(); - } + void drain() { + if (getAndIncrement() != 0) { + return; + } + + for (;;) { + if (isDisposed()) { return; } - RxJavaPlugins.onError(new IllegalStateException("Queue is empty?!")); - return; + + if (!active) { + + boolean d = done; + + CompletableSource cs; + + try { + cs = queue.poll(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + innerError(ex); + return; + } + + boolean empty = cs == null; + + if (d && empty) { + if (once.compareAndSet(false, true)) { + actual.onComplete(); + } + return; + } + + if (!empty) { + active = true; + cs.subscribe(inner); + request(); + } + } + + if (decrementAndGet() == 0) { + break; + } + } + } + + void request() { + if (sourceFused != QueueSubscription.SYNC) { + int p = consumed + 1; + if (p == limit) { + consumed = 0; + s.request(p); + } else { + consumed = p; + } + } + } + + void innerError(Throwable e) { + if (once.compareAndSet(false, true)) { + s.cancel(); + actual.onError(e); + } else { + RxJavaPlugins.onError(e); } + } - c.subscribe(inner); + void innerComplete() { + active = false; + drain(); } - final class ConcatInnerObserver implements CompletableObserver { + static final class ConcatInnerObserver extends AtomicReference implements CompletableObserver { + private static final long serialVersionUID = -5454794857847146511L; + + final CompletableConcatSubscriber parent; + + ConcatInnerObserver(CompletableConcatSubscriber parent) { + this.parent = parent; + } + @Override public void onSubscribe(Disposable d) { - sd.update(d); + DisposableHelper.replace(this, d); } @Override public void onError(Throwable e) { - innerError(e); + parent.innerError(e); } @Override public void onComplete() { - innerComplete(); + parent.innerComplete(); } } } diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableMerge.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableMerge.java index c63bb0f161..e4c21272c5 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableMerge.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableMerge.java @@ -19,6 +19,7 @@ import io.reactivex.*; import io.reactivex.disposables.*; +import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins; @@ -52,22 +53,16 @@ static final class CompletableMergeSubscriber final AtomicThrowable error; - final AtomicBoolean once; - final CompositeDisposable set; Subscription s; - volatile boolean done; - - CompletableMergeSubscriber(CompletableObserver actual, int maxConcurrency, boolean delayErrors) { this.actual = actual; this.maxConcurrency = maxConcurrency; this.delayErrors = delayErrors; this.set = new CompositeDisposable(); this.error = new AtomicThrowable(); - this.once = new AtomicBoolean(); lazySet(1); } @@ -97,99 +92,120 @@ public void onSubscribe(Subscription s) { @Override public void onNext(CompletableSource t) { - if (done) { - return; - } - getAndIncrement(); - t.subscribe(new InnerObserver()); + MergeInnerObserver inner = new MergeInnerObserver(); + set.add(inner); + t.subscribe(inner); } @Override public void onError(Throwable t) { - if (done || !error.addThrowable(t)) { - RxJavaPlugins.onError(t); - return; - } + if (!delayErrors) { + set.dispose(); - done = true; - terminate(); + if (error.addThrowable(t)) { + if (getAndSet(0) > 0) { + actual.onError(error.terminate()); + } + } else { + RxJavaPlugins.onError(t); + } + } else { + if (error.addThrowable(t)) { + if (decrementAndGet() == 0) { + actual.onError(error.terminate()); + } + } else { + RxJavaPlugins.onError(t); + } + } } @Override public void onComplete() { - if (done) { - return; - } - done = true; - terminate(); - } - - void terminate() { if (decrementAndGet() == 0) { Throwable ex = error.get(); - if (ex == null) { - actual.onComplete(); - } else { + if (ex != null) { actual.onError(error.terminate()); + } else { + actual.onComplete(); } } - else if (!delayErrors) { - Throwable ex = error.get(); - if (ex != null && ex != ExceptionHelper.TERMINATED) { - s.cancel(); - set.dispose(); - if (once.compareAndSet(false, true)) { + } + + void innerError(MergeInnerObserver inner, Throwable t) { + set.delete(inner); + if (!delayErrors) { + s.cancel(); + set.dispose(); + + if (error.addThrowable(t)) { + if (getAndSet(0) > 0) { + actual.onError(error.terminate()); + } + } else { + RxJavaPlugins.onError(t); + } + } else { + if (error.addThrowable(t)) { + if (decrementAndGet() == 0) { actual.onError(error.terminate()); } else { - RxJavaPlugins.onError(ex); + if (maxConcurrency != Integer.MAX_VALUE) { + s.request(1); + } } + } else { + RxJavaPlugins.onError(t); + } + } + } + + void innerComplete(MergeInnerObserver inner) { + set.delete(inner); + if (decrementAndGet() == 0) { + Throwable ex = error.get(); + if (ex != null) { + actual.onError(ex); + } else { + actual.onComplete(); + } + } else { + if (maxConcurrency != Integer.MAX_VALUE) { + s.request(1); } } } - final class InnerObserver implements CompletableObserver { - Disposable d; - boolean innerDone; + final class MergeInnerObserver + extends AtomicReference + implements CompletableObserver, Disposable { + private static final long serialVersionUID = 251330541679988317L; @Override public void onSubscribe(Disposable d) { - this.d = d; - set.add(d); + DisposableHelper.setOnce(this, d); } @Override public void onError(Throwable e) { - if (innerDone || !error.addThrowable(e)) { - RxJavaPlugins.onError(e); - return; - } - - set.delete(d); - innerDone = true; - - terminate(); - - if (delayErrors && !done) { - s.request(1); - } + innerError(this, e); } @Override public void onComplete() { - if (innerDone) { - return; - } - - set.delete(d); - innerDone = true; + innerComplete(this); + } - terminate(); + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } - if (!done) { - s.request(1); - } + @Override + public void dispose() { + DisposableHelper.dispose(this); } } } diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeArray.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeArray.java index 93dee69067..1c1a2faa71 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeArray.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeArray.java @@ -29,9 +29,9 @@ public CompletableMergeArray(CompletableSource[] sources) { @Override public void subscribeActual(final CompletableObserver s) { final CompositeDisposable set = new CompositeDisposable(); - final AtomicInteger wip = new AtomicInteger(sources.length + 1); final AtomicBoolean once = new AtomicBoolean(); + InnerCompletableObserver shared = new InnerCompletableObserver(s, once, set, sources.length + 1); s.onSubscribe(set); for (CompletableSource c : sources) { @@ -42,45 +42,53 @@ public void subscribeActual(final CompletableObserver s) { if (c == null) { set.dispose(); NullPointerException npe = new NullPointerException("A completable source is null"); - if (once.compareAndSet(false, true)) { - s.onError(npe); - } else { - RxJavaPlugins.onError(npe); - } + shared.onError(npe); return; } - c.subscribe(new CompletableObserver() { - @Override - public void onSubscribe(Disposable d) { - set.add(d); - } + c.subscribe(shared); + } - @Override - public void onError(Throwable e) { - set.dispose(); - if (once.compareAndSet(false, true)) { - s.onError(e); - } else { - RxJavaPlugins.onError(e); - } - } + shared.onComplete(); + } - @Override - public void onComplete() { - if (wip.decrementAndGet() == 0) { - if (once.compareAndSet(false, true)) { - s.onComplete(); - } - } - } + static final class InnerCompletableObserver extends AtomicInteger implements CompletableObserver { + private static final long serialVersionUID = -8360547806504310570L; + + final CompletableObserver actual; + + final AtomicBoolean once; - }); + final CompositeDisposable set; + + InnerCompletableObserver(CompletableObserver actual, AtomicBoolean once, CompositeDisposable set, int n) { + this.actual = actual; + this.once = once; + this.set = set; + this.lazySet(n); + } + + @Override + public void onSubscribe(Disposable d) { + set.add(d); } - if (wip.decrementAndGet() == 0) { + @Override + public void onError(Throwable e) { + set.dispose(); if (once.compareAndSet(false, true)) { - s.onComplete(); + actual.onError(e); + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + public void onComplete() { + if (decrementAndGet() == 0) { + if (once.compareAndSet(false, true)) { + actual.onComplete(); + } } } } diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeIterable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeIterable.java index e2ef6d7c3c..55b952b5de 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeIterable.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeIterable.java @@ -19,6 +19,7 @@ import io.reactivex.*; import io.reactivex.disposables.*; import io.reactivex.exceptions.Exceptions; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.plugins.RxJavaPlugins; public final class CompletableMergeIterable extends Completable { @@ -50,8 +51,8 @@ public void subscribeActual(final CompletableObserver s) { } final AtomicInteger wip = new AtomicInteger(1); - final AtomicBoolean once = new AtomicBoolean(); + MergeCompletableObserver shared = new MergeCompletableObserver(s, set, wip); for (;;) { if (set.isDisposed()) { return; @@ -63,11 +64,7 @@ public void subscribeActual(final CompletableObserver s) { } catch (Throwable e) { Exceptions.throwIfFatal(e); set.dispose(); - if (once.compareAndSet(false, true)) { - s.onError(e); - } else { - RxJavaPlugins.onError(e); - } + shared.onError(e); return; } @@ -82,15 +79,11 @@ public void subscribeActual(final CompletableObserver s) { CompletableSource c; try { - c = iterator.next(); + c = ObjectHelper.requireNonNull(iterator.next(), "The iterator returned a null CompletableSource"); } catch (Throwable e) { Exceptions.throwIfFatal(e); set.dispose(); - if (once.compareAndSet(false, true)) { - s.onError(e); - } else { - RxJavaPlugins.onError(e); - } + shared.onError(e); return; } @@ -98,50 +91,51 @@ public void subscribeActual(final CompletableObserver s) { return; } - if (c == null) { - set.dispose(); - NullPointerException npe = new NullPointerException("A completable source is null"); - if (once.compareAndSet(false, true)) { - s.onError(npe); - } else { - RxJavaPlugins.onError(npe); - } - return; - } - wip.getAndIncrement(); - c.subscribe(new CompletableObserver() { - @Override - public void onSubscribe(Disposable d) { - set.add(d); - } + c.subscribe(shared); + } - @Override - public void onError(Throwable e) { - set.dispose(); - if (once.compareAndSet(false, true)) { - s.onError(e); - } else { - RxJavaPlugins.onError(e); - } - } + shared.onComplete(); + } - @Override - public void onComplete() { - if (wip.decrementAndGet() == 0) { - if (once.compareAndSet(false, true)) { - s.onComplete(); - } - } - } + static final class MergeCompletableObserver extends AtomicBoolean implements CompletableObserver { + + private static final long serialVersionUID = -7730517613164279224L; + + final CompositeDisposable set; - }); + final CompletableObserver actual; + + final AtomicInteger wip; + + MergeCompletableObserver(CompletableObserver actual, CompositeDisposable set, AtomicInteger wip) { + this.actual = actual; + this.set = set; + this.wip = wip; } - if (wip.decrementAndGet() == 0) { - if (once.compareAndSet(false, true)) { - s.onComplete(); + @Override + public void onSubscribe(Disposable d) { + set.add(d); + } + + @Override + public void onError(Throwable e) { + set.dispose(); + if (compareAndSet(false, true)) { + actual.onError(e); + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + public void onComplete() { + if (wip.decrementAndGet() == 0) { + if (compareAndSet(false, true)) { + actual.onComplete(); + } } } } diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableObserveOn.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableObserveOn.java index d2936a3bea..7f4dafae9f 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableObserveOn.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableObserveOn.java @@ -13,9 +13,11 @@ package io.reactivex.internal.operators.completable; +import java.util.concurrent.atomic.AtomicReference; + import io.reactivex.*; import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.ArrayCompositeDisposable; +import io.reactivex.internal.disposables.DisposableHelper; public final class CompletableObserveOn extends Completable { @@ -29,49 +31,65 @@ public CompletableObserveOn(CompletableSource source, Scheduler scheduler) { @Override protected void subscribeActual(final CompletableObserver s) { + source.subscribe(new ObserveOnCompletableObserver(s, scheduler)); + } - final ArrayCompositeDisposable ad = new ArrayCompositeDisposable(2); - final Scheduler.Worker w = scheduler.createWorker(); - ad.set(0, w); - - s.onSubscribe(ad); - - source.subscribe(new CompletableObserver() { - - @Override - public void onComplete() { - w.schedule(new Runnable() { - @Override - public void run() { - try { - s.onComplete(); - } finally { - ad.dispose(); - } - } - }); - } + static final class ObserveOnCompletableObserver + extends AtomicReference + implements CompletableObserver, Disposable, Runnable { - @Override - public void onError(final Throwable e) { - w.schedule(new Runnable() { - @Override - public void run() { - try { - s.onError(e); - } finally { - ad.dispose(); - } - } - }); - } - @Override - public void onSubscribe(Disposable d) { - ad.set(1, d); + private static final long serialVersionUID = 8571289934935992137L; + + final CompletableObserver actual; + + final Scheduler scheduler; + + Throwable error; + + ObserveOnCompletableObserver(CompletableObserver actual, Scheduler scheduler) { + this.actual = actual; + this.scheduler = scheduler; + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.setOnce(this, d)) { + actual.onSubscribe(this); } + } - }); + @Override + public void onError(Throwable e) { + this.error = e; + DisposableHelper.replace(this, scheduler.scheduleDirect(this)); + } + + @Override + public void onComplete() { + DisposableHelper.replace(this, scheduler.scheduleDirect(this)); + } + + @Override + public void run() { + Throwable ex = error; + if (ex != null) { + error = null; + actual.onError(ex); + } else { + actual.onComplete(); + } + } } } diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletablePeek.java b/src/main/java/io/reactivex/internal/operators/completable/CompletablePeek.java index f2ac9339bc..1b6f222dd9 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletablePeek.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletablePeek.java @@ -63,12 +63,7 @@ public void onComplete() { s.onComplete(); - try { - onAfterTerminate.run(); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - RxJavaPlugins.onError(e); - } + doAfter(); } @Override @@ -78,17 +73,12 @@ public void onError(Throwable e) { onTerminate.run(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - e = new CompositeException(ex, e); + e = new CompositeException(e, ex); } s.onError(e); - try { - onAfterTerminate.run(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - RxJavaPlugins.onError(ex); - } + doAfter(); } @Override @@ -117,6 +107,16 @@ public void run() { })); } + void doAfter() { + + try { + onAfterTerminate.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + + } }); } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleResumeNext.java b/src/main/java/io/reactivex/internal/operators/single/SingleResumeNext.java index 0b0730f387..b91bc1b67b 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleResumeNext.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleResumeNext.java @@ -47,7 +47,7 @@ static final class ResumeMainSingleObserver extends AtomicReference> nextFunction; - public ResumeMainSingleObserver(SingleObserver actual, + ResumeMainSingleObserver(SingleObserver actual, Function> nextFunction) { this.actual = actual; this.nextFunction = nextFunction; diff --git a/src/main/java/io/reactivex/observers/BaseTestConsumer.java b/src/main/java/io/reactivex/observers/BaseTestConsumer.java index f11695efb7..b05f151128 100644 --- a/src/main/java/io/reactivex/observers/BaseTestConsumer.java +++ b/src/main/java/io/reactivex/observers/BaseTestConsumer.java @@ -324,6 +324,7 @@ public final U assertValue(Predicate valuePredicate) { /** * Asserts that this TestObserver/TestSubscriber received an onNext value at the given index * for the provided predicate returns true. + * @param index the position to assert on * @param valuePredicate * the predicate that receives the onNext value * and should return true for the expected value. diff --git a/src/test/java/io/reactivex/completable/CompletableTest.java b/src/test/java/io/reactivex/completable/CompletableTest.java index 66f8ee0f53..cc8ac571d9 100644 --- a/src/test/java/io/reactivex/completable/CompletableTest.java +++ b/src/test/java/io/reactivex/completable/CompletableTest.java @@ -366,8 +366,7 @@ public void accept(long v) { c.blockingAwait(); - // FIXME this request pattern looks odd because all 10 completions trigger 1 requests - Assert.assertEquals(Arrays.asList(5L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L), requested); + Assert.assertEquals(Arrays.asList(5L, 4L, 4L), requested); } @Test(expected = NullPointerException.class) @@ -1876,8 +1875,8 @@ public void doOnErrorThrows() { } catch (CompositeException ex) { List a = ex.getExceptions(); Assert.assertEquals(2, a.size()); - Assert.assertTrue(a.get(0) instanceof IllegalStateException); - Assert.assertTrue(a.get(1) instanceof TestException); + Assert.assertTrue(a.get(0) instanceof TestException); + Assert.assertTrue(a.get(1) instanceof IllegalStateException); } } diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableAmbTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableAmbTest.java new file mode 100644 index 0000000000..83f42f829c --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableAmbTest.java @@ -0,0 +1,148 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import static org.junit.Assert.*; + +import java.util.*; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.*; + +public class CompletableAmbTest { + + @Test + public void ambLots() { + List ms = new ArrayList(); + + for (int i = 0; i < 32; i++) { + ms.add(Completable.never()); + } + + ms.add(Completable.complete()); + + Completable.amb(ms) + .test() + .assertResult(); + } + + @Test + public void ambFirstDone() { + Completable.amb(Arrays.asList(Completable.complete(), Completable.complete())) + .test() + .assertResult(); + } + + @Test + public void dispose() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestObserver to = Completable.amb(Arrays.asList(pp1.ignoreElements(), pp2.ignoreElements())) + .test(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + to.dispose(); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + } + + @Test + public void innerErrorRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp0 = PublishProcessor.create(); + final PublishProcessor pp1 = PublishProcessor.create(); + + final TestObserver to = Completable.amb(Arrays.asList(pp0.ignoreElements(), pp1.ignoreElements())) + .test(); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp0.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + pp1.onError(ex); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to.assertFailure(TestException.class); + + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void nullSourceSuccessRace() { + for (int i = 0; i < 1000; i++) { + List errors = TestHelper.trackPluginErrors(); + + try { + + final Subject ps = ReplaySubject.create(); + ps.onNext(1); + + final Completable source = Completable.ambArray(ps.ignoreElements(), Completable.never(), Completable.never(), null); + + Runnable r1 = new Runnable() { + @Override + public void run() { + source.test(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ps.onComplete(); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, NullPointerException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + +} diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableConcatTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableConcatTest.java index c0884badbb..4b50820b31 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableConcatTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableConcatTest.java @@ -14,12 +14,21 @@ package io.reactivex.internal.operators.completable; import static org.junit.Assert.*; + +import java.util.*; + import org.junit.Test; import org.reactivestreams.*; import io.reactivex.*; -import io.reactivex.exceptions.MissingBackpressureException; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.observers.*; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.*; +import io.reactivex.schedulers.Schedulers; public class CompletableConcatTest { @@ -51,4 +60,198 @@ public void invalidPrefetch() { assertEquals("prefetch > 0 required but it was -99", ex.getMessage()); } } + + @Test + public void dispose() { + TestHelper.checkDisposed(Completable.concat(Flowable.just(Completable.complete()))); + } + + @Test + public void errorRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + + try { + final PublishProcessor ps1 = PublishProcessor.create(); + final PublishProcessor ps2 = PublishProcessor.create(); + + TestObserver to = Completable.concat(ps1.map(new Function() { + @Override + public Completable apply(Integer v) throws Exception { + return ps2.ignoreElements(); + } + })).test(); + + ps1.onNext(1); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps1.onError(ex); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ps2.onError(ex); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to.assertFailure(TestException.class); + + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void synchronousFusedCrash() { + Completable.concat(Flowable.range(1, 2).map(new Function() { + @Override + public Completable apply(Integer v) throws Exception { + throw new TestException(); + } + })) + .test() + .assertFailure(TestException.class); + } + + @Test + public void unboundedIn() { + Completable.concat(Flowable.just(Completable.complete()).hide(), Integer.MAX_VALUE) + .test() + .assertResult(); + } + + @Test + public void syncFusedUnboundedIn() { + Completable.concat(Flowable.just(Completable.complete()), Integer.MAX_VALUE) + .test() + .assertResult(); + } + + @Test + public void asyncFusedUnboundedIn() { + UnicastProcessor up = UnicastProcessor.create(); + up.onNext(Completable.complete()); + up.onComplete(); + + Completable.concat(up, Integer.MAX_VALUE) + .test() + .assertResult(); + } + + @Test + public void arrayCancelled() { + Completable.concatArray(Completable.complete(), Completable.complete()) + .test(true) + .assertEmpty(); + } + + @Test + public void arrayFirstCancels() { + final TestObserver to = new TestObserver(); + + Completable.concatArray(new Completable() { + @Override + protected void subscribeActual(CompletableObserver s) { + s.onSubscribe(Disposables.empty()); + to.cancel(); + s.onComplete(); + } + }, Completable.complete()) + .subscribe(to); + + to.assertEmpty(); + } + + @Test + public void iterableCancelled() { + Completable.concat(Arrays.asList(Completable.complete(), Completable.complete())) + .test(true) + .assertEmpty(); + } + + @Test + public void iterableFirstCancels() { + final TestObserver to = new TestObserver(); + + Completable.concat(Arrays.asList(new Completable() { + @Override + protected void subscribeActual(CompletableObserver s) { + s.onSubscribe(Disposables.empty()); + to.cancel(); + s.onComplete(); + } + }, Completable.complete())) + .subscribe(to); + + to.assertEmpty(); + } + + @Test + public void arrayCancelRace() { + Completable[] a = new Completable[1024]; + Arrays.fill(a, Completable.complete()); + + for (int i = 0; i < 500; i++) { + + final Completable c = Completable.concatArray(a); + + final TestObserver to = new TestObserver(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + c.subscribe(to); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + } + } + + @Test + public void iterableCancelRace() { + Completable[] a = new Completable[1024]; + Arrays.fill(a, Completable.complete()); + + for (int i = 0; i < 500; i++) { + + final Completable c = Completable.concat(Arrays.asList(a)); + + final TestObserver to = new TestObserver(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + c.subscribe(to); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableMergeIterableTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableMergeIterableTest.java new file mode 100644 index 0000000000..3076002ccc --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableMergeIterableTest.java @@ -0,0 +1,127 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import java.util.*; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.PublishSubject; + +public class CompletableMergeIterableTest { + + @Test + public void errorRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishSubject ps1 = PublishSubject.create(); + final PublishSubject ps2 = PublishSubject.create(); + + TestObserver to = Completable.merge( + Arrays.asList(ps1.ignoreElements(), ps2.ignoreElements())).test(); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps1.onError(ex); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ps2.onError(ex); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to.assertFailure(TestException.class); + + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void cancelAfterHasNext() { + final TestObserver to = new TestObserver(); + + Completable.merge(new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + @Override + public boolean hasNext() { + to.cancel(); + return true; + } + + @Override + public Completable next() { + return Completable.complete(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }).subscribe(to); + + to.assertEmpty(); + } + + @Test + public void cancelAfterNext() { + final TestObserver to = new TestObserver(); + + Completable.merge(new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + @Override + public boolean hasNext() { + return true; + } + + @Override + public Completable next() { + to.cancel(); + return Completable.complete(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }).subscribe(to); + + to.assertEmpty(); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableMergeTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableMergeTest.java index 565770ab05..2e091a17bb 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableMergeTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableMergeTest.java @@ -15,16 +15,20 @@ import static org.junit.Assert.*; -import java.util.List; +import java.util.*; import org.junit.Test; +import org.reactivestreams.Subscriber; import io.reactivex.*; -import io.reactivex.disposables.Disposables; -import io.reactivex.exceptions.TestException; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; public class CompletableMergeTest { @Test @@ -131,4 +135,422 @@ public void errorAfterMainDelayError() { to.assertFailure(TestException.class); } + + @Test + public void dispose() { + TestHelper.checkDisposed(Completable.merge(Flowable.just(Completable.complete()))); + } + + @Test + public void disposePropagates() { + + PublishProcessor pp = PublishProcessor.create(); + + TestObserver to = Completable.merge(Flowable.just(pp.ignoreElements())).test(); + + assertTrue(pp.hasSubscribers()); + + to.cancel(); + + assertFalse(pp.hasSubscribers()); + + to.assertEmpty(); + } + + @Test + public void innerComplete() { + PublishProcessor pp = PublishProcessor.create(); + + TestObserver to = Completable.merge(Flowable.just(pp.ignoreElements())).test(); + + pp.onComplete(); + + to.assertResult(); + } + + @Test + public void innerError() { + PublishProcessor pp = PublishProcessor.create(); + + TestObserver to = Completable.merge(Flowable.just(pp.ignoreElements())).test(); + + pp.onError(new TestException()); + + to.assertFailure(TestException.class); + } + + @Test + public void innerErrorDelayError() { + PublishProcessor pp = PublishProcessor.create(); + + TestObserver to = Completable.mergeDelayError(Flowable.just(pp.ignoreElements())).test(); + + pp.onError(new TestException()); + + to.assertFailure(TestException.class); + } + + @Test + public void mainErrorInnerErrorRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp1 = PublishProcessor.create(); + final PublishProcessor pp2 = PublishProcessor.create(); + + TestObserver to = Completable.merge(pp1.map(new Function() { + @Override + public Completable apply(Integer v) throws Exception { + return pp2.ignoreElements(); + } + })).test(); + + pp1.onNext(1); + + final Throwable ex1 = new TestException(); + final Throwable ex2 = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp1.onError(ex1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + pp2.onError(ex2); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + Throwable ex = to.errors().get(0); + if (ex instanceof CompositeException) { + to.assertSubscribed().assertNoValues().assertNotComplete(); + + errors = TestHelper.compositeList(ex); + TestHelper.assertError(errors, 0, TestException.class); + TestHelper.assertError(errors, 1, TestException.class); + } else { + to.assertFailure(TestException.class); + + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, TestException.class); + } + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void mainErrorInnerErrorDelayedRace() { + for (int i = 0; i < 500; i++) { + final PublishProcessor pp1 = PublishProcessor.create(); + final PublishProcessor pp2 = PublishProcessor.create(); + + TestObserver to = Completable.mergeDelayError(pp1.map(new Function() { + @Override + public Completable apply(Integer v) throws Exception { + return pp2.ignoreElements(); + } + })).test(); + + pp1.onNext(1); + + final Throwable ex1 = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp1.onError(ex1); + } + }; + + final Throwable ex2 = new TestException(); + Runnable r2 = new Runnable() { + @Override + public void run() { + pp2.onError(ex2); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to.assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + TestHelper.assertError(errors, 0, TestException.class); + TestHelper.assertError(errors, 1, TestException.class); + } + } + + @Test + public void maxConcurrencyOne() { + final PublishProcessor pp1 = PublishProcessor.create(); + final PublishProcessor pp2 = PublishProcessor.create(); + + TestObserver to = Completable.merge(Flowable.just(pp1.ignoreElements(), pp2.ignoreElements()), 1) + .test(); + + assertTrue(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + pp1.onComplete(); + + assertTrue(pp2.hasSubscribers()); + + pp2.onComplete(); + + to.assertResult(); + } + + @Test + public void maxConcurrencyOneDelayError() { + final PublishProcessor pp1 = PublishProcessor.create(); + final PublishProcessor pp2 = PublishProcessor.create(); + + TestObserver to = Completable.mergeDelayError(Flowable.just(pp1.ignoreElements(), pp2.ignoreElements()), 1) + .test(); + + assertTrue(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + pp1.onComplete(); + + assertTrue(pp2.hasSubscribers()); + + pp2.onComplete(); + + to.assertResult(); + } + + @Test + public void maxConcurrencyOneDelayErrorFirst() { + final PublishProcessor pp1 = PublishProcessor.create(); + final PublishProcessor pp2 = PublishProcessor.create(); + + TestObserver to = Completable.mergeDelayError(Flowable.just(pp1.ignoreElements(), pp2.ignoreElements()), 1) + .test(); + + assertTrue(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + pp1.onError(new TestException()); + + assertTrue(pp2.hasSubscribers()); + + pp2.onComplete(); + + to.assertFailure(TestException.class); + } + + @Test + public void maxConcurrencyOneDelayMainErrors() { + final PublishProcessor> pp0 = PublishProcessor.create(); + final PublishProcessor pp1 = PublishProcessor.create(); + final PublishProcessor pp2 = PublishProcessor.create(); + + TestObserver to = Completable.mergeDelayError( + pp0.map(new Function, Completable>() { + @Override + public Completable apply(PublishProcessor v) throws Exception { + return v.ignoreElements(); + } + }), 1) + .test(); + + pp0.onNext(pp1); + + assertTrue(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + pp1.onComplete(); + + pp0.onNext(pp2); + pp0.onError(new TestException()); + + assertTrue(pp2.hasSubscribers()); + + pp2.onComplete(); + + to.assertFailure(TestException.class); + } + + @Test + public void mainDoubleOnError() { + List errors = TestHelper.trackPluginErrors(); + try { + Completable.mergeDelayError(new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(Completable.complete()); + s.onError(new TestException("First")); + s.onError(new TestException("Second")); + } + }) + .test() + .assertFailureAndMessage(TestException.class, "First"); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void innerDoubleOnError() { + List errors = TestHelper.trackPluginErrors(); + try { + final CompletableObserver[] o = { null }; + Completable.mergeDelayError(Flowable.just(new Completable() { + @Override + protected void subscribeActual(CompletableObserver s) { + s.onSubscribe(Disposables.empty()); + s.onError(new TestException("First")); + o[0] = s; + } + })) + .test() + .assertFailureAndMessage(TestException.class, "First"); + + o[0].onError(new TestException("Second")); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void innerIsDisposed() { + final TestObserver to = new TestObserver(); + + Completable.mergeDelayError(Flowable.just(new Completable() { + @Override + protected void subscribeActual(CompletableObserver s) { + s.onSubscribe(Disposables.empty()); + assertFalse(((Disposable)s).isDisposed()); + + to.dispose(); + + assertTrue(((Disposable)s).isDisposed()); + } + })) + .subscribe(to); + } + + @Test + public void mergeArrayInnerErrorRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp1 = PublishProcessor.create(); + final PublishProcessor pp2 = PublishProcessor.create(); + + TestObserver to = Completable.mergeArray(pp1.ignoreElements(), pp2.ignoreElements()).test(); + + pp1.onNext(1); + + final Throwable ex1 = new TestException(); + final Throwable ex2 = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp1.onError(ex1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + pp2.onError(ex2); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to.assertFailure(TestException.class); + + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void delayErrorIterableCancel() { + Completable.mergeDelayError(Arrays.asList(Completable.complete())) + .test(true) + .assertEmpty(); + } + + @Test + public void delayErrorIterableCancelAfterHasNext() { + final TestObserver to = new TestObserver(); + + Completable.mergeDelayError(new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + @Override + public boolean hasNext() { + to.cancel(); + return true; + } + + @Override + public Completable next() { + return Completable.complete(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }) + .subscribe(to); + + to.assertEmpty(); + } + + @Test + public void delayErrorIterableCancelAfterNext() { + final TestObserver to = new TestObserver(); + + Completable.mergeDelayError(new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + @Override + public boolean hasNext() { + return true; + } + + @Override + public Completable next() { + to.cancel(); + return Completable.complete(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }) + .subscribe(to); + + to.assertEmpty(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableObserveOnTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableObserveOnTest.java new file mode 100644 index 0000000000..8fbc51d97c --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableObserveOnTest.java @@ -0,0 +1,38 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.functions.Function; +import io.reactivex.schedulers.Schedulers; + +public class CompletableObserveOnTest { + + @Test + public void dispose() { + TestHelper.checkDisposed(Completable.complete().observeOn(Schedulers.single())); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeCompletable(new Function() { + @Override + public CompletableSource apply(Completable c) throws Exception { + return c.observeOn(Schedulers.single()); + } + }); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletablePeekTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletablePeekTest.java new file mode 100644 index 0000000000..ace99649ee --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletablePeekTest.java @@ -0,0 +1,46 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Action; +import io.reactivex.plugins.RxJavaPlugins; + +public class CompletablePeekTest { + + @Test + public void onAfterTerminateCrashes() { + List errors = TestHelper.trackPluginErrors(); + try { + Completable.complete() + .doAfterTerminate(new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }) + .test() + .assertResult(); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeTest.java index d9af3429b1..b27d73d54d 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeTest.java @@ -273,7 +273,7 @@ public void onNext(String v) { // to make sure after o1.onNextBeingSent and o2.onNextBeingSent are hit that the following // onNext is invoked. - int timeout = 10; + int timeout = 20; while (timeout-- > 0 && concurrentCounter.get() != 1) { Thread.sleep(100);