diff --git a/src/main/java/rx/internal/operators/OperatorConcat.java b/src/main/java/rx/internal/operators/OperatorConcat.java index f1a429dea4..8e8514b9ef 100644 --- a/src/main/java/rx/internal/operators/OperatorConcat.java +++ b/src/main/java/rx/internal/operators/OperatorConcat.java @@ -115,7 +115,7 @@ public void onStart() { private void requestFromChild(long n) { // we track 'requested' so we know whether we should subscribe the next or not ConcatInnerSubscriber actualSubscriber = currentSubscriber; - if (REQUESTED_UPDATER.getAndAdd(this, n) == 0) { + if (n > 0 && BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER, this, n) == 0) { if (actualSubscriber == null && wip > 0) { // this means we may be moving from one subscriber to another after having stopped processing // so need to kick off the subscribe via this request notification diff --git a/src/test/java/rx/internal/operators/OperatorConcatTest.java b/src/test/java/rx/internal/operators/OperatorConcatTest.java index 5ad80c6d70..75bfee65f4 100644 --- a/src/test/java/rx/internal/operators/OperatorConcatTest.java +++ b/src/test/java/rx/internal/operators/OperatorConcatTest.java @@ -16,6 +16,7 @@ package rx.internal.operators; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; @@ -35,7 +36,9 @@ import org.mockito.InOrder; import rx.Observable.OnSubscribe; +import rx.Scheduler.Worker; import rx.*; +import rx.functions.Action0; import rx.functions.Func1; import rx.internal.util.RxRingBuffer; import rx.observers.TestSubscriber; @@ -766,4 +769,30 @@ public void onError(Throwable e) { assertEquals(n, counter.get()); } + + @Test + public void testRequestOverflowDoesNotStallStream() { + Observable o1 = Observable.just(1,2,3); + Observable o2 = Observable.just(4,5,6); + final AtomicBoolean completed = new AtomicBoolean(false); + o1.concatWith(o2).subscribe(new Subscriber() { + + @Override + public void onCompleted() { + completed.set(true); + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Integer t) { + request(2); + }}); + + assertTrue(completed.get()); + } + }