diff --git a/src/main/java/rx/internal/operators/OnSubscribeRedo.java b/src/main/java/rx/internal/operators/OnSubscribeRedo.java index f0a761c948..3cb521fd06 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeRedo.java +++ b/src/main/java/rx/internal/operators/OnSubscribeRedo.java @@ -238,8 +238,11 @@ public void onNext(T v) { child.onNext(v); while (true) { long cc = consumerCapacity.get(); - if (cc != Long.MAX_VALUE) { - if (consumerCapacity.compareAndSet(cc, cc -1)) { + if (cc == 0) { + // wasn't requested but has arrived anyway + break; + } else if (cc != Long.MAX_VALUE) { + if (consumerCapacity.compareAndSet(cc, cc - 1)) { break; } } else { @@ -320,10 +323,14 @@ public void onError(Throwable e) { @Override public void onNext(Object t) { + // a restart instruction has arrived if (!isLocked.get() && !child.isUnsubscribed()) { + // if there are outstanding requests if (consumerCapacity.get() > 0) { + // schedule resubscription worker.schedule(subscribeToSource); } else { + // otherwise we indicate that on the next request we should resubscribe resumeBoundary.compareAndSet(false, true); } }