diff --git a/src/main/java/io/reactivex/subjects/UnicastSubject.java b/src/main/java/io/reactivex/subjects/UnicastSubject.java index f5570772bb..070c4f301a 100644 --- a/src/main/java/io/reactivex/subjects/UnicastSubject.java +++ b/src/main/java/io/reactivex/subjects/UnicastSubject.java @@ -153,6 +153,13 @@ protected void subscribeActual(Observer observer) { } } + void doTerminate() { + Runnable r = onTerminate.get(); + if (r != null && onTerminate.compareAndSet(r, null)) { + r.run(); + } + } + void notifyOnCancelled() { Runnable r = onCancelled; onCancelled = null; @@ -184,7 +191,7 @@ public void onNext(T t) { return; } if (t == null) { - onError(new NullPointerException()); + onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } queue.offer(t); @@ -198,10 +205,13 @@ public void onError(Throwable t) { return; } if (t == null) { - t = new NullPointerException(); + t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } error = t; done = true; + + doTerminate(); + drain(); } @@ -211,6 +221,9 @@ public void onComplete() { return; } done = true; + + doTerminate(); + drain(); } @@ -374,6 +387,9 @@ public void clear() { public void dispose() { if (!disposed) { disposed = true; + + doTerminate(); + actual.lazySet(null); if (wip.getAndIncrement() == 0) { clearAndNotify(queue); diff --git a/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java b/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java index 0f644d68fe..14476e80f2 100644 --- a/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java +++ b/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java @@ -13,6 +13,10 @@ package io.reactivex.subjects; +import io.reactivex.Observable; +import io.reactivex.disposables.Disposable; +import java.util.concurrent.atomic.AtomicBoolean; +import static org.junit.Assert.assertEquals; import org.junit.Test; import io.reactivex.internal.fuseable.QueueDisposable; @@ -57,4 +61,52 @@ public void fusionOfflie() { .assertOf(ObserverFusion.assertFuseable()) .assertOf(ObserverFusion.assertFusionMode(QueueDisposable.ASYNC)) .assertResult(1); - }} + } + + @Test + public void onTerminateCalledWhenOnError() { + final AtomicBoolean didRunOnTerminate = new AtomicBoolean(); + + UnicastSubject us = UnicastSubject.create(Observable.bufferSize(), new Runnable() { + @Override public void run() { + didRunOnTerminate.set(true); + } + }); + + assertEquals(false, didRunOnTerminate.get()); + us.onError(new RuntimeException("some error")); + assertEquals(true, didRunOnTerminate.get()); + } + + @Test + public void onTerminateCalledWhenOnComplete() { + final AtomicBoolean didRunOnTerminate = new AtomicBoolean(); + + UnicastSubject us = UnicastSubject.create(Observable.bufferSize(), new Runnable() { + @Override public void run() { + didRunOnTerminate.set(true); + } + }); + + assertEquals(false, didRunOnTerminate.get()); + us.onComplete(); + assertEquals(true, didRunOnTerminate.get()); + } + + @Test + public void onTerminateCalledWhenCanceled() { + final AtomicBoolean didRunOnTerminate = new AtomicBoolean(); + + UnicastSubject us = UnicastSubject.create(Observable.bufferSize(), new Runnable() { + @Override public void run() { + didRunOnTerminate.set(true); + } + }); + + final Disposable subscribe = us.subscribe(); + + assertEquals(false, didRunOnTerminate.get()); + subscribe.dispose(); + assertEquals(true, didRunOnTerminate.get()); + } +}