diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java index baf19d0665..6663aec4c0 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java @@ -33,8 +33,14 @@ public OperatorDoOnEach(Observer doOnEachObserver) { @Override public Subscriber call(final Subscriber observer) { return new Subscriber(observer) { + + private boolean done = false; + @Override public void onCompleted() { + if (done) { + return; + } try { doOnEachObserver.onCompleted(); } catch (Throwable e) { @@ -42,10 +48,15 @@ public void onCompleted() { return; } observer.onCompleted(); + // Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer + done = true; } @Override public void onError(Throwable e) { + if (done) { + return; + } try { doOnEachObserver.onError(e); } catch (Throwable e2) { @@ -53,10 +64,14 @@ public void onError(Throwable e) { return; } observer.onError(e); + done = true; } @Override public void onNext(T value) { + if (done) { + return; + } try { doOnEachObserver.onNext(value); } catch (Throwable e) { diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorDoOnEachTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorDoOnEachTest.java index c3e4410096..9a5a533bae 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorDoOnEachTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorDoOnEachTest.java @@ -15,6 +15,7 @@ */ package rx.internal.operators; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -30,6 +31,9 @@ import rx.functions.Action1; import rx.functions.Func1; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + public class OperatorDoOnEachTest { @Mock @@ -114,4 +118,55 @@ public void call(String s) { } + @Test + public void testIssue1451Case1() { + // https://github.com/Netflix/RxJava/issues/1451 + int[] nums = {1, 2, 3}; + final AtomicInteger count = new AtomicInteger(); + for (final int n : nums) { + Observable + .from(Boolean.TRUE, Boolean.FALSE) + .takeWhile(new Func1() { + @Override + public Boolean call(Boolean value) { + return value; + } + }) + .toList() + .doOnNext(new Action1>() { + @Override + public void call(List booleans) { + count.incrementAndGet(); + } + }) + .subscribe(); + } + assertEquals(nums.length, count.get()); + } + + @Test + public void testIssue1451Case2() { + // https://github.com/Netflix/RxJava/issues/1451 + int[] nums = {1, 2, 3}; + final AtomicInteger count = new AtomicInteger(); + for (final int n : nums) { + Observable + .from(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE) + .takeWhile(new Func1() { + @Override + public Boolean call(Boolean value) { + return value; + } + }) + .toList() + .doOnNext(new Action1>() { + @Override + public void call(List booleans) { + count.incrementAndGet(); + } + }) + .subscribe(); + } + assertEquals(nums.length, count.get()); + } }