Skip to content

Commit

Permalink
2.x: UnicastSubject fix onTerminate (#4592)
Browse files Browse the repository at this point in the history
  • Loading branch information
vanniktech authored and akarnokd committed Sep 23, 2016
1 parent 7791076 commit d923f31
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 3 deletions.
20 changes: 18 additions & 2 deletions src/main/java/io/reactivex/subjects/UnicastSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@ protected void subscribeActual(Observer<? super T> observer) {
}
}

void doTerminate() {
Runnable r = onTerminate.get();
if (r != null && onTerminate.compareAndSet(r, null)) {
r.run();
}
}

void notifyOnCancelled() {
Runnable r = onCancelled;
onCancelled = null;
Expand Down Expand Up @@ -184,7 +191,7 @@ public void onNext(T t) {
return;
}
if (t == null) {
onError(new NullPointerException());
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
queue.offer(t);
Expand All @@ -198,10 +205,13 @@ public void onError(Throwable t) {
return;
}
if (t == null) {
t = new NullPointerException();
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
error = t;
done = true;

doTerminate();

drain();
}

Expand All @@ -211,6 +221,9 @@ public void onComplete() {
return;
}
done = true;

doTerminate();

drain();
}

Expand Down Expand Up @@ -374,6 +387,9 @@ public void clear() {
public void dispose() {
if (!disposed) {
disposed = true;

doTerminate();

actual.lazySet(null);
if (wip.getAndIncrement() == 0) {
clearAndNotify(queue);
Expand Down
54 changes: 53 additions & 1 deletion src/test/java/io/reactivex/subjects/UnicastSubjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

package io.reactivex.subjects;

import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import org.junit.Test;

import io.reactivex.internal.fuseable.QueueDisposable;
Expand Down Expand Up @@ -57,4 +61,52 @@ public void fusionOfflie() {
.assertOf(ObserverFusion.<Integer>assertFuseable())
.assertOf(ObserverFusion.<Integer>assertFusionMode(QueueDisposable.ASYNC))
.assertResult(1);
}}
}

@Test
public void onTerminateCalledWhenOnError() {
final AtomicBoolean didRunOnTerminate = new AtomicBoolean();

UnicastSubject<Integer> us = UnicastSubject.create(Observable.bufferSize(), new Runnable() {
@Override public void run() {
didRunOnTerminate.set(true);
}
});

assertEquals(false, didRunOnTerminate.get());
us.onError(new RuntimeException("some error"));
assertEquals(true, didRunOnTerminate.get());
}

@Test
public void onTerminateCalledWhenOnComplete() {
final AtomicBoolean didRunOnTerminate = new AtomicBoolean();

UnicastSubject<Integer> us = UnicastSubject.create(Observable.bufferSize(), new Runnable() {
@Override public void run() {
didRunOnTerminate.set(true);
}
});

assertEquals(false, didRunOnTerminate.get());
us.onComplete();
assertEquals(true, didRunOnTerminate.get());
}

@Test
public void onTerminateCalledWhenCanceled() {
final AtomicBoolean didRunOnTerminate = new AtomicBoolean();

UnicastSubject<Integer> us = UnicastSubject.create(Observable.bufferSize(), new Runnable() {
@Override public void run() {
didRunOnTerminate.set(true);
}
});

final Disposable subscribe = us.subscribe();

assertEquals(false, didRunOnTerminate.get());
subscribe.dispose();
assertEquals(true, didRunOnTerminate.get());
}
}

0 comments on commit d923f31

Please sign in to comment.