Skip to content

Commit

Permalink
2.x: Fix Single.takeUntil, Maybe.takeUntil dispose behavior (#6019)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored May 25, 2018
1 parent b9f5ef8 commit 07e126f
Show file tree
Hide file tree
Showing 7 changed files with 880 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(Object value) {
SubscriptionHelper.cancel(this);
parent.otherComplete();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ static final class TakeUntilMainObserver<T>
@Override
public void dispose() {
DisposableHelper.dispose(this);
other.dispose();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,78 @@ public void ambRace() {
RxJavaPlugins.reset();
}
}


@Test
public void untilCompletableMainComplete() {
CompletableSubject main = CompletableSubject.create();
CompletableSubject other = CompletableSubject.create();

TestObserver<Void> to = main.ambWith(other).test();

assertTrue("Main no observers?", main.hasObservers());
assertTrue("Other no observers?", other.hasObservers());

main.onComplete();

assertFalse("Main has observers?", main.hasObservers());
assertFalse("Other has observers?", other.hasObservers());

to.assertResult();
}

@Test
public void untilCompletableMainError() {
CompletableSubject main = CompletableSubject.create();
CompletableSubject other = CompletableSubject.create();

TestObserver<Void> to = main.ambWith(other).test();

assertTrue("Main no observers?", main.hasObservers());
assertTrue("Other no observers?", other.hasObservers());

main.onError(new TestException());

assertFalse("Main has observers?", main.hasObservers());
assertFalse("Other has observers?", other.hasObservers());

to.assertFailure(TestException.class);
}

@Test
public void untilCompletableOtherOnComplete() {
CompletableSubject main = CompletableSubject.create();
CompletableSubject other = CompletableSubject.create();

TestObserver<Void> to = main.ambWith(other).test();

assertTrue("Main no observers?", main.hasObservers());
assertTrue("Other no observers?", other.hasObservers());

other.onComplete();

assertFalse("Main has observers?", main.hasObservers());
assertFalse("Other has observers?", other.hasObservers());

to.assertResult();
}

@Test
public void untilCompletableOtherError() {
CompletableSubject main = CompletableSubject.create();
CompletableSubject other = CompletableSubject.create();

TestObserver<Void> to = main.ambWith(other).test();

assertTrue("Main no observers?", main.hasObservers());
assertTrue("Other no observers?", other.hasObservers());

other.onError(new TestException());

assertFalse("Main has observers?", main.hasObservers());
assertFalse("Other has observers?", other.hasObservers());

to.assertFailure(TestException.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Function;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.subscribers.TestSubscriber;
Expand Down Expand Up @@ -293,4 +294,133 @@ public Flowable<Integer> apply(Flowable<Integer> c) throws Exception {
}
});
}

@Test
public void untilPublisherMainSuccess() {
PublishProcessor<Integer> main = PublishProcessor.create();
PublishProcessor<Integer> other = PublishProcessor.create();

TestSubscriber<Integer> ts = main.takeUntil(other).test();

assertTrue("Main no subscribers?", main.hasSubscribers());
assertTrue("Other no subscribers?", other.hasSubscribers());

main.onNext(1);
main.onNext(2);
main.onComplete();

assertFalse("Main has subscribers?", main.hasSubscribers());
assertFalse("Other has subscribers?", other.hasSubscribers());

ts.assertResult(1, 2);
}

@Test
public void untilPublisherMainComplete() {
PublishProcessor<Integer> main = PublishProcessor.create();
PublishProcessor<Integer> other = PublishProcessor.create();

TestSubscriber<Integer> ts = main.takeUntil(other).test();

assertTrue("Main no subscribers?", main.hasSubscribers());
assertTrue("Other no subscribers?", other.hasSubscribers());

main.onComplete();

assertFalse("Main has subscribers?", main.hasSubscribers());
assertFalse("Other has subscribers?", other.hasSubscribers());

ts.assertResult();
}

@Test
public void untilPublisherMainError() {
PublishProcessor<Integer> main = PublishProcessor.create();
PublishProcessor<Integer> other = PublishProcessor.create();

TestSubscriber<Integer> ts = main.takeUntil(other).test();

assertTrue("Main no subscribers?", main.hasSubscribers());
assertTrue("Other no subscribers?", other.hasSubscribers());

main.onError(new TestException());

assertFalse("Main has subscribers?", main.hasSubscribers());
assertFalse("Other has subscribers?", other.hasSubscribers());

ts.assertFailure(TestException.class);
}

@Test
public void untilPublisherOtherOnNext() {
PublishProcessor<Integer> main = PublishProcessor.create();
PublishProcessor<Integer> other = PublishProcessor.create();

TestSubscriber<Integer> ts = main.takeUntil(other).test();

assertTrue("Main no subscribers?", main.hasSubscribers());
assertTrue("Other no subscribers?", other.hasSubscribers());

other.onNext(1);

assertFalse("Main has subscribers?", main.hasSubscribers());
assertFalse("Other has subscribers?", other.hasSubscribers());

ts.assertResult();
}

@Test
public void untilPublisherOtherOnComplete() {
PublishProcessor<Integer> main = PublishProcessor.create();
PublishProcessor<Integer> other = PublishProcessor.create();

TestSubscriber<Integer> ts = main.takeUntil(other).test();

assertTrue("Main no subscribers?", main.hasSubscribers());
assertTrue("Other no subscribers?", other.hasSubscribers());

other.onComplete();

assertFalse("Main has subscribers?", main.hasSubscribers());
assertFalse("Other has subscribers?", other.hasSubscribers());

ts.assertResult();
}

@Test
public void untilPublisherOtherError() {
PublishProcessor<Integer> main = PublishProcessor.create();
PublishProcessor<Integer> other = PublishProcessor.create();

TestSubscriber<Integer> ts = main.takeUntil(other).test();

assertTrue("Main no subscribers?", main.hasSubscribers());
assertTrue("Other no subscribers?", other.hasSubscribers());

other.onError(new TestException());

assertFalse("Main has subscribers?", main.hasSubscribers());
assertFalse("Other has subscribers?", other.hasSubscribers());

ts.assertFailure(TestException.class);
}

@Test
public void untilPublisherDispose() {
PublishProcessor<Integer> main = PublishProcessor.create();
PublishProcessor<Integer> other = PublishProcessor.create();

TestSubscriber<Integer> ts = main.takeUntil(other).test();

assertTrue("Main no subscribers?", main.hasSubscribers());
assertTrue("Other no subscribers?", other.hasSubscribers());

ts.dispose();

assertFalse("Main has subscribers?", main.hasSubscribers());
assertFalse("Other has subscribers?", other.hasSubscribers());

ts.assertEmpty();
}

}
Loading

0 comments on commit 07e126f

Please sign in to comment.