From 794cd19ca33226ab5e68d2789774a3c4ec68af15 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 25 May 2018 10:29:11 +0200 Subject: [PATCH] 2.x: Fix Single.takeUntil, Maybe.takeUntil dispose behavior --- .../maybe/MaybeTakeUntilPublisher.java | 1 + .../operators/single/SingleTakeUntil.java | 1 + .../completable/CompletableAmbTest.java | 74 +++++ .../flowable/FlowableTakeUntilTest.java | 130 ++++++++ .../operators/maybe/MaybeTakeUntilTest.java | 253 +++++++++++++++ .../observable/ObservableTakeUntilTest.java | 131 ++++++++ .../operators/single/SingleTakeUntilTest.java | 291 +++++++++++++++++- 7 files changed, 880 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeTakeUntilPublisher.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeTakeUntilPublisher.java index 6db80da42f..793436526d 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeTakeUntilPublisher.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeTakeUntilPublisher.java @@ -137,6 +137,7 @@ public void onSubscribe(Subscription s) { @Override public void onNext(Object value) { + SubscriptionHelper.cancel(this); parent.otherComplete(); } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java b/src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java index 4eaa6f7620..2a4bb1b4b7 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java @@ -69,6 +69,7 @@ static final class TakeUntilMainObserver @Override public void dispose() { DisposableHelper.dispose(this); + other.dispose(); } @Override diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableAmbTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableAmbTest.java index 992acdc6e6..f6f3dc0337 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableAmbTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableAmbTest.java @@ -186,4 +186,78 @@ public void ambRace() { RxJavaPlugins.reset(); } } + + + @Test + public void untilCompletableMainComplete() { + CompletableSubject main = CompletableSubject.create(); + CompletableSubject other = CompletableSubject.create(); + + TestObserver to = main.ambWith(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + main.onComplete(); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertResult(); + } + + @Test + public void untilCompletableMainError() { + CompletableSubject main = CompletableSubject.create(); + CompletableSubject other = CompletableSubject.create(); + + TestObserver to = main.ambWith(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + main.onError(new TestException()); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertFailure(TestException.class); + } + + @Test + public void untilCompletableOtherOnComplete() { + CompletableSubject main = CompletableSubject.create(); + CompletableSubject other = CompletableSubject.create(); + + TestObserver to = main.ambWith(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + other.onComplete(); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertResult(); + } + + @Test + public void untilCompletableOtherError() { + CompletableSubject main = CompletableSubject.create(); + CompletableSubject other = CompletableSubject.create(); + + TestObserver to = main.ambWith(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + other.onError(new TestException()); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertFailure(TestException.class); + } + } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeUntilTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeUntilTest.java index ce7bd20179..be7bc1621b 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeUntilTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeUntilTest.java @@ -20,6 +20,7 @@ import org.reactivestreams.*; import io.reactivex.*; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; import io.reactivex.processors.PublishProcessor; import io.reactivex.subscribers.TestSubscriber; @@ -293,4 +294,133 @@ public Flowable apply(Flowable c) throws Exception { } }); } + + @Test + public void untilPublisherMainSuccess() { + PublishProcessor main = PublishProcessor.create(); + PublishProcessor other = PublishProcessor.create(); + + TestSubscriber ts = main.takeUntil(other).test(); + + assertTrue("Main no subscribers?", main.hasSubscribers()); + assertTrue("Other no subscribers?", other.hasSubscribers()); + + main.onNext(1); + main.onNext(2); + main.onComplete(); + + assertFalse("Main has subscribers?", main.hasSubscribers()); + assertFalse("Other has subscribers?", other.hasSubscribers()); + + ts.assertResult(1, 2); + } + + @Test + public void untilPublisherMainComplete() { + PublishProcessor main = PublishProcessor.create(); + PublishProcessor other = PublishProcessor.create(); + + TestSubscriber ts = main.takeUntil(other).test(); + + assertTrue("Main no subscribers?", main.hasSubscribers()); + assertTrue("Other no subscribers?", other.hasSubscribers()); + + main.onComplete(); + + assertFalse("Main has subscribers?", main.hasSubscribers()); + assertFalse("Other has subscribers?", other.hasSubscribers()); + + ts.assertResult(); + } + + @Test + public void untilPublisherMainError() { + PublishProcessor main = PublishProcessor.create(); + PublishProcessor other = PublishProcessor.create(); + + TestSubscriber ts = main.takeUntil(other).test(); + + assertTrue("Main no subscribers?", main.hasSubscribers()); + assertTrue("Other no subscribers?", other.hasSubscribers()); + + main.onError(new TestException()); + + assertFalse("Main has subscribers?", main.hasSubscribers()); + assertFalse("Other has subscribers?", other.hasSubscribers()); + + ts.assertFailure(TestException.class); + } + + @Test + public void untilPublisherOtherOnNext() { + PublishProcessor main = PublishProcessor.create(); + PublishProcessor other = PublishProcessor.create(); + + TestSubscriber ts = main.takeUntil(other).test(); + + assertTrue("Main no subscribers?", main.hasSubscribers()); + assertTrue("Other no subscribers?", other.hasSubscribers()); + + other.onNext(1); + + assertFalse("Main has subscribers?", main.hasSubscribers()); + assertFalse("Other has subscribers?", other.hasSubscribers()); + + ts.assertResult(); + } + + @Test + public void untilPublisherOtherOnComplete() { + PublishProcessor main = PublishProcessor.create(); + PublishProcessor other = PublishProcessor.create(); + + TestSubscriber ts = main.takeUntil(other).test(); + + assertTrue("Main no subscribers?", main.hasSubscribers()); + assertTrue("Other no subscribers?", other.hasSubscribers()); + + other.onComplete(); + + assertFalse("Main has subscribers?", main.hasSubscribers()); + assertFalse("Other has subscribers?", other.hasSubscribers()); + + ts.assertResult(); + } + + @Test + public void untilPublisherOtherError() { + PublishProcessor main = PublishProcessor.create(); + PublishProcessor other = PublishProcessor.create(); + + TestSubscriber ts = main.takeUntil(other).test(); + + assertTrue("Main no subscribers?", main.hasSubscribers()); + assertTrue("Other no subscribers?", other.hasSubscribers()); + + other.onError(new TestException()); + + assertFalse("Main has subscribers?", main.hasSubscribers()); + assertFalse("Other has subscribers?", other.hasSubscribers()); + + ts.assertFailure(TestException.class); + } + + @Test + public void untilPublisherDispose() { + PublishProcessor main = PublishProcessor.create(); + PublishProcessor other = PublishProcessor.create(); + + TestSubscriber ts = main.takeUntil(other).test(); + + assertTrue("Main no subscribers?", main.hasSubscribers()); + assertTrue("Other no subscribers?", other.hasSubscribers()); + + ts.dispose(); + + assertFalse("Main has subscribers?", main.hasSubscribers()); + assertFalse("Other has subscribers?", other.hasSubscribers()); + + ts.assertEmpty(); + } + } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeTakeUntilTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeTakeUntilTest.java index 601c123d9e..8cd569869c 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeTakeUntilTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeTakeUntilTest.java @@ -25,6 +25,7 @@ import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; +import io.reactivex.subjects.MaybeSubject; public class MaybeTakeUntilTest { @@ -211,4 +212,256 @@ public void run() { to.assertResult(); } } + + @Test + public void untilMaybeMainSuccess() { + MaybeSubject main = MaybeSubject.create(); + MaybeSubject other = MaybeSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + main.onSuccess(1); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertResult(1); + } + + @Test + public void untilMaybeMainComplete() { + MaybeSubject main = MaybeSubject.create(); + MaybeSubject other = MaybeSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + main.onComplete(); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertResult(); + } + + @Test + public void untilMaybeMainError() { + MaybeSubject main = MaybeSubject.create(); + MaybeSubject other = MaybeSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + main.onError(new TestException()); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertFailure(TestException.class); + } + + @Test + public void untilMaybeOtherSuccess() { + MaybeSubject main = MaybeSubject.create(); + MaybeSubject other = MaybeSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + other.onSuccess(1); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertResult(); + } + + @Test + public void untilMaybeOtherComplete() { + MaybeSubject main = MaybeSubject.create(); + MaybeSubject other = MaybeSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + other.onComplete(); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertResult(); + } + + @Test + public void untilMaybeOtherError() { + MaybeSubject main = MaybeSubject.create(); + MaybeSubject other = MaybeSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + other.onError(new TestException()); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertFailure(TestException.class); + } + + @Test + public void untilMaybeDispose() { + MaybeSubject main = MaybeSubject.create(); + MaybeSubject other = MaybeSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + to.dispose(); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertEmpty(); + } + + @Test + public void untilPublisherMainSuccess() { + MaybeSubject main = MaybeSubject.create(); + PublishProcessor other = PublishProcessor.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasSubscribers()); + + main.onSuccess(1); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasSubscribers()); + + to.assertResult(1); + } + + @Test + public void untilPublisherMainComplete() { + MaybeSubject main = MaybeSubject.create(); + PublishProcessor other = PublishProcessor.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasSubscribers()); + + main.onComplete(); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasSubscribers()); + + to.assertResult(); + } + + @Test + public void untilPublisherMainError() { + MaybeSubject main = MaybeSubject.create(); + PublishProcessor other = PublishProcessor.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasSubscribers()); + + main.onError(new TestException()); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasSubscribers()); + + to.assertFailure(TestException.class); + } + + @Test + public void untilPublisherOtherOnNext() { + MaybeSubject main = MaybeSubject.create(); + PublishProcessor other = PublishProcessor.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasSubscribers()); + + other.onNext(1); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasSubscribers()); + + to.assertResult(); + } + + @Test + public void untilPublisherOtherOnComplete() { + MaybeSubject main = MaybeSubject.create(); + PublishProcessor other = PublishProcessor.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasSubscribers()); + + other.onComplete(); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasSubscribers()); + + to.assertResult(); + } + + @Test + public void untilPublisherOtherError() { + MaybeSubject main = MaybeSubject.create(); + PublishProcessor other = PublishProcessor.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasSubscribers()); + + other.onError(new TestException()); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasSubscribers()); + + to.assertFailure(TestException.class); + } + + @Test + public void untilPublisherDispose() { + MaybeSubject main = MaybeSubject.create(); + PublishProcessor other = PublishProcessor.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasSubscribers()); + + to.dispose(); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasSubscribers()); + + to.assertEmpty(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeUntilTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeUntilTest.java index 7a2f139da9..d42e5df389 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeUntilTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeUntilTest.java @@ -20,6 +20,7 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; import io.reactivex.observers.TestObserver; import io.reactivex.subjects.PublishSubject; @@ -271,4 +272,134 @@ public Observable apply(Observable o) throws Exception { } }); } + + + @Test + public void untilPublisherMainSuccess() { + PublishSubject main = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + main.onNext(1); + main.onNext(2); + main.onComplete(); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertResult(1, 2); + } + + @Test + public void untilPublisherMainComplete() { + PublishSubject main = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + main.onComplete(); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertResult(); + } + + @Test + public void untilPublisherMainError() { + PublishSubject main = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + main.onError(new TestException()); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertFailure(TestException.class); + } + + @Test + public void untilPublisherOtherOnNext() { + PublishSubject main = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + other.onNext(1); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertResult(); + } + + @Test + public void untilPublisherOtherOnComplete() { + PublishSubject main = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + other.onComplete(); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertResult(); + } + + @Test + public void untilPublisherOtherError() { + PublishSubject main = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + other.onError(new TestException()); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertFailure(TestException.class); + } + + @Test + public void untilPublisherDispose() { + PublishSubject main = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + to.dispose(); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertEmpty(); + } + } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleTakeUntilTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleTakeUntilTest.java index 42633a9fcc..d646542a93 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleTakeUntilTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleTakeUntilTest.java @@ -13,7 +13,7 @@ package io.reactivex.internal.operators.single; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import java.util.List; import java.util.concurrent.CancellationException; @@ -27,6 +27,7 @@ import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; +import io.reactivex.subjects.*; public class SingleTakeUntilTest { @@ -291,4 +292,292 @@ protected void subscribeActual(Subscriber s) { .test() .assertFailure(CancellationException.class); } + + @Test + public void untilSingleMainSuccess() { + SingleSubject main = SingleSubject.create(); + SingleSubject other = SingleSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + main.onSuccess(1); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertResult(1); + } + + @Test + public void untilSingleMainError() { + SingleSubject main = SingleSubject.create(); + SingleSubject other = SingleSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + main.onError(new TestException()); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertFailure(TestException.class); + } + + @Test + public void untilSingleOtherSuccess() { + SingleSubject main = SingleSubject.create(); + SingleSubject other = SingleSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + other.onSuccess(1); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertFailure(CancellationException.class); + } + + @Test + public void untilSingleOtherError() { + SingleSubject main = SingleSubject.create(); + SingleSubject other = SingleSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + other.onError(new TestException()); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertFailure(TestException.class); + } + + @Test + public void untilSingleDispose() { + SingleSubject main = SingleSubject.create(); + SingleSubject other = SingleSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + to.dispose(); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertEmpty(); + } + + @Test + public void untilPublisherMainSuccess() { + SingleSubject main = SingleSubject.create(); + PublishProcessor other = PublishProcessor.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasSubscribers()); + + main.onSuccess(1); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasSubscribers()); + + to.assertResult(1); + } + + @Test + public void untilPublisherMainError() { + SingleSubject main = SingleSubject.create(); + PublishProcessor other = PublishProcessor.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasSubscribers()); + + main.onError(new TestException()); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasSubscribers()); + + to.assertFailure(TestException.class); + } + + @Test + public void untilPublisherOtherOnNext() { + SingleSubject main = SingleSubject.create(); + PublishProcessor other = PublishProcessor.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasSubscribers()); + + other.onNext(1); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasSubscribers()); + + to.assertFailure(CancellationException.class); + } + + @Test + public void untilPublisherOtherOnComplete() { + SingleSubject main = SingleSubject.create(); + PublishProcessor other = PublishProcessor.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasSubscribers()); + + other.onComplete(); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasSubscribers()); + + to.assertFailure(CancellationException.class); + } + + @Test + public void untilPublisherOtherError() { + SingleSubject main = SingleSubject.create(); + PublishProcessor other = PublishProcessor.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasSubscribers()); + + other.onError(new TestException()); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasSubscribers()); + + to.assertFailure(TestException.class); + } + + @Test + public void untilPublisherDispose() { + SingleSubject main = SingleSubject.create(); + PublishProcessor other = PublishProcessor.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasSubscribers()); + + to.dispose(); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasSubscribers()); + + to.assertEmpty(); + } + + @Test + public void untilCompletableMainSuccess() { + SingleSubject main = SingleSubject.create(); + CompletableSubject other = CompletableSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + main.onSuccess(1); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertResult(1); + } + + @Test + public void untilCompletableMainError() { + SingleSubject main = SingleSubject.create(); + CompletableSubject other = CompletableSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + main.onError(new TestException()); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertFailure(TestException.class); + } + + @Test + public void untilCompletableOtherOnComplete() { + SingleSubject main = SingleSubject.create(); + CompletableSubject other = CompletableSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + other.onComplete(); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertFailure(CancellationException.class); + } + + @Test + public void untilCompletableOtherError() { + SingleSubject main = SingleSubject.create(); + CompletableSubject other = CompletableSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + other.onError(new TestException()); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertFailure(TestException.class); + } + + @Test + public void untilCompletableDispose() { + SingleSubject main = SingleSubject.create(); + CompletableSubject other = CompletableSubject.create(); + + TestObserver to = main.takeUntil(other).test(); + + assertTrue("Main no observers?", main.hasObservers()); + assertTrue("Other no observers?", other.hasObservers()); + + to.dispose(); + + assertFalse("Main has observers?", main.hasObservers()); + assertFalse("Other has observers?", other.hasObservers()); + + to.assertEmpty(); + } }