diff --git a/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java b/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java index f8821f7f52..83d3646a43 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java @@ -70,7 +70,9 @@ public void request(long n) { } o.onNext(it.next()); } - o.onCompleted(); + if (!o.isUnsubscribed()) { + o.onCompleted(); + } } else if(n > 0) { // backpressure is requested long _c = REQUESTED_UPDATER.getAndAdd(this, n); diff --git a/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRange.java b/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRange.java index d4be3490eb..376b738925 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRange.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRange.java @@ -64,7 +64,9 @@ public void request(long n) { } o.onNext((int) i); } - o.onCompleted(); + if (!o.isUnsubscribed()) { + o.onCompleted(); + } } else if (n > 0) { // backpressure is requested long _c = REQUESTED_UPDATER.getAndAdd(this, n); diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java index 6663aec4c0..99fab1319a 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java @@ -47,9 +47,9 @@ public void onCompleted() { onError(e); return; } - observer.onCompleted(); // Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer done = true; + observer.onCompleted(); } @Override @@ -57,6 +57,7 @@ public void onError(Throwable e) { if (done) { return; } + done = true; try { doOnEachObserver.onError(e); } catch (Throwable e2) { @@ -64,7 +65,6 @@ public void onError(Throwable e) { return; } observer.onError(e); - done = true; } @Override diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeWhile.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeWhile.java index 740d8762e1..098bf958f0 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeWhile.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeWhile.java @@ -49,12 +49,15 @@ public Subscriber call(final Subscriber subscriber) { private int counter = 0; + private boolean done = false; + @Override public void onNext(T args) { boolean isSelected; try { isSelected = predicate.call(args, counter++); } catch (Throwable e) { + done = true; subscriber.onError(e); unsubscribe(); return; @@ -62,6 +65,7 @@ public void onNext(T args) { if (isSelected) { subscriber.onNext(args); } else { + done = true; subscriber.onCompleted(); unsubscribe(); } @@ -69,12 +73,16 @@ public void onNext(T args) { @Override public void onCompleted() { - subscriber.onCompleted(); + if (!done) { + subscriber.onCompleted(); + } } @Override public void onError(Throwable e) { - subscriber.onError(e); + if (!done) { + subscriber.onError(e); + } } };