From 53b46595de4711b3fa6a2d8c5a8750efd341882d Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 17 Jul 2014 10:02:09 +0800 Subject: [PATCH 1/2] Fix issue #1451 --- .../internal/operators/OperatorDoOnEach.java | 15 +++++ .../operators/OperatorDoOnEachTest.java | 55 +++++++++++++++++++ 2 files changed, 70 insertions(+) diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java index baf19d0665..6663aec4c0 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java @@ -33,8 +33,14 @@ public OperatorDoOnEach(Observer doOnEachObserver) { @Override public Subscriber call(final Subscriber observer) { return new Subscriber(observer) { + + private boolean done = false; + @Override public void onCompleted() { + if (done) { + return; + } try { doOnEachObserver.onCompleted(); } catch (Throwable e) { @@ -42,10 +48,15 @@ public void onCompleted() { return; } observer.onCompleted(); + // Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer + done = true; } @Override public void onError(Throwable e) { + if (done) { + return; + } try { doOnEachObserver.onError(e); } catch (Throwable e2) { @@ -53,10 +64,14 @@ public void onError(Throwable e) { return; } observer.onError(e); + done = true; } @Override public void onNext(T value) { + if (done) { + return; + } try { doOnEachObserver.onNext(value); } catch (Throwable e) { diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorDoOnEachTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorDoOnEachTest.java index c3e4410096..9a5a533bae 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorDoOnEachTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorDoOnEachTest.java @@ -15,6 +15,7 @@ */ package rx.internal.operators; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -30,6 +31,9 @@ import rx.functions.Action1; import rx.functions.Func1; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + public class OperatorDoOnEachTest { @Mock @@ -114,4 +118,55 @@ public void call(String s) { } + @Test + public void testIssue1451Case1() { + // https://github.com/Netflix/RxJava/issues/1451 + int[] nums = {1, 2, 3}; + final AtomicInteger count = new AtomicInteger(); + for (final int n : nums) { + Observable + .from(Boolean.TRUE, Boolean.FALSE) + .takeWhile(new Func1() { + @Override + public Boolean call(Boolean value) { + return value; + } + }) + .toList() + .doOnNext(new Action1>() { + @Override + public void call(List booleans) { + count.incrementAndGet(); + } + }) + .subscribe(); + } + assertEquals(nums.length, count.get()); + } + + @Test + public void testIssue1451Case2() { + // https://github.com/Netflix/RxJava/issues/1451 + int[] nums = {1, 2, 3}; + final AtomicInteger count = new AtomicInteger(); + for (final int n : nums) { + Observable + .from(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE) + .takeWhile(new Func1() { + @Override + public Boolean call(Boolean value) { + return value; + } + }) + .toList() + .doOnNext(new Action1>() { + @Override + public void call(List booleans) { + count.incrementAndGet(); + } + }) + .subscribe(); + } + assertEquals(nums.length, count.get()); + } } From 6bbd921cd4e0da242b2fa97a80fdf488ebf70e44 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 17 Jul 2014 15:30:02 +0800 Subject: [PATCH 2/2] Fix related classes for issue #1451 --- .../internal/operators/OnSubscribeFromIterable.java | 4 +++- .../java/rx/internal/operators/OnSubscribeRange.java | 4 +++- .../java/rx/internal/operators/OperatorDoOnEach.java | 4 ++-- .../rx/internal/operators/OperatorTakeWhile.java | 12 ++++++++++-- 4 files changed, 18 insertions(+), 6 deletions(-) diff --git a/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java b/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java index f8821f7f52..83d3646a43 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java @@ -70,7 +70,9 @@ public void request(long n) { } o.onNext(it.next()); } - o.onCompleted(); + if (!o.isUnsubscribed()) { + o.onCompleted(); + } } else if(n > 0) { // backpressure is requested long _c = REQUESTED_UPDATER.getAndAdd(this, n); diff --git a/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRange.java b/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRange.java index d4be3490eb..376b738925 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRange.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRange.java @@ -64,7 +64,9 @@ public void request(long n) { } o.onNext((int) i); } - o.onCompleted(); + if (!o.isUnsubscribed()) { + o.onCompleted(); + } } else if (n > 0) { // backpressure is requested long _c = REQUESTED_UPDATER.getAndAdd(this, n); diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java index 6663aec4c0..99fab1319a 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java @@ -47,9 +47,9 @@ public void onCompleted() { onError(e); return; } - observer.onCompleted(); // Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer done = true; + observer.onCompleted(); } @Override @@ -57,6 +57,7 @@ public void onError(Throwable e) { if (done) { return; } + done = true; try { doOnEachObserver.onError(e); } catch (Throwable e2) { @@ -64,7 +65,6 @@ public void onError(Throwable e) { return; } observer.onError(e); - done = true; } @Override diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeWhile.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeWhile.java index 740d8762e1..098bf958f0 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeWhile.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeWhile.java @@ -49,12 +49,15 @@ public Subscriber call(final Subscriber subscriber) { private int counter = 0; + private boolean done = false; + @Override public void onNext(T args) { boolean isSelected; try { isSelected = predicate.call(args, counter++); } catch (Throwable e) { + done = true; subscriber.onError(e); unsubscribe(); return; @@ -62,6 +65,7 @@ public void onNext(T args) { if (isSelected) { subscriber.onNext(args); } else { + done = true; subscriber.onCompleted(); unsubscribe(); } @@ -69,12 +73,16 @@ public void onNext(T args) { @Override public void onCompleted() { - subscriber.onCompleted(); + if (!done) { + subscriber.onCompleted(); + } } @Override public void onError(Throwable e) { - subscriber.onError(e); + if (!done) { + subscriber.onError(e); + } } };