diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEager.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEager.java index fd8d7767a8..d133c23428 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEager.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEager.java @@ -233,12 +233,11 @@ public void drain() { int missed = 1; InnerQueuedSubscriber inner = current; Subscriber a = actual; - long r = requested.get(); - long e = 0L; ErrorMode em = errorMode; - outer: for (;;) { + long r = requested.get(); + long e = 0L; if (inner == null) { @@ -271,6 +270,8 @@ public void drain() { } } + boolean continueNextSource = false; + if (inner != null) { SimpleQueue q = inner.queue(); if (q != null) { @@ -313,7 +314,8 @@ public void drain() { inner = null; current = null; s.request(1); - continue outer; + continueNextSource = true; + break; } if (empty) { @@ -353,15 +355,18 @@ public void drain() { inner = null; current = null; s.request(1); - continue; + continueNextSource = true; } } } } if (e != 0L && r != Long.MAX_VALUE) { - r = requested.addAndGet(-e); - e = 0L; + requested.addAndGet(-e); + } + + if (continueNextSource) { + continue; } missed = addAndGet(-missed); diff --git a/src/main/java/io/reactivex/internal/subscribers/InnerQueuedSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/InnerQueuedSubscriber.java index 83505120e7..70aeadce3f 100644 --- a/src/main/java/io/reactivex/internal/subscribers/InnerQueuedSubscriber.java +++ b/src/main/java/io/reactivex/internal/subscribers/InnerQueuedSubscriber.java @@ -72,14 +72,14 @@ public void onSubscribe(Subscription s) { if (m == QueueSubscription.ASYNC) { fusionMode = m; queue = qs; - QueueDrainHelper.request(get(), prefetch); + QueueDrainHelper.request(s, prefetch); return; } } queue = QueueDrainHelper.createQueue(prefetch); - QueueDrainHelper.request(get(), prefetch); + QueueDrainHelper.request(s, prefetch); } } @@ -104,22 +104,26 @@ public void onComplete() { @Override public void request(long n) { - long p = produced + n; - if (p >= limit) { - produced = 0L; - get().request(p); - } else { - produced = p; + if (fusionMode != QueueSubscription.SYNC) { + long p = produced + n; + if (p >= limit) { + produced = 0L; + get().request(p); + } else { + produced = p; + } } } public void requestOne() { - long p = produced + 1; - if (p == limit) { - produced = 0L; - get().request(p); - } else { - produced = p; + if (fusionMode != QueueSubscription.SYNC) { + long p = produced + 1; + if (p == limit) { + produced = 0L; + get().request(p); + } else { + produced = p; + } } } diff --git a/src/test/java/io/reactivex/TestHelper.java b/src/test/java/io/reactivex/TestHelper.java index 06aaabda2a..f40fcb8a7c 100644 --- a/src/test/java/io/reactivex/TestHelper.java +++ b/src/test/java/io/reactivex/TestHelper.java @@ -30,6 +30,7 @@ import io.reactivex.disposables.*; import io.reactivex.exceptions.*; import io.reactivex.functions.*; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.fuseable.*; import io.reactivex.internal.operators.maybe.MaybeToFlowable; import io.reactivex.internal.operators.single.SingleToFlowable; @@ -144,21 +145,25 @@ public void accept(Throwable t) { } public static void assertError(List list, int index, Class clazz) { - try { - assertTrue(list.get(index).toString(), clazz.isInstance(list.get(index))); - } catch (AssertionError e) { - list.get(index).printStackTrace(); - throw e; + Throwable ex = list.get(index); + if (!clazz.isInstance(ex)) { + AssertionError err = new AssertionError(clazz + " expected but got " + list.get(index)); + err.initCause(list.get(index)); + throw err; } } public static void assertError(List list, int index, Class clazz, String message) { - try { - assertTrue(list.get(index).toString(), clazz.isInstance(list.get(index))); - assertEquals(message, list.get(index).getMessage()); - } catch (AssertionError e) { - list.get(index).printStackTrace(); - throw e; + Throwable ex = list.get(index); + if (!clazz.isInstance(ex)) { + AssertionError err = new AssertionError("Type " + clazz + " expected but got " + ex); + err.initCause(ex); + throw err; + } + if (!ObjectHelper.equals(message, ex.getMessage())) { + AssertionError err = new AssertionError("Message " + message + " expected but got " + ex.getMessage()); + err.initCause(ex); + throw err; } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java index 30c8c5dadf..55525c84f0 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java @@ -577,11 +577,12 @@ public void testAsynchronousRun() { public Flowable apply(Integer t) { return Flowable.range(1, 1000).subscribeOn(Schedulers.computation()); } - }).observeOn(Schedulers.newThread()).subscribe(ts); - - ts.awaitTerminalEvent(5, TimeUnit.SECONDS); - ts.assertNoErrors(); - ts.assertValueCount(2000); + }).observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertNoErrors() + .assertValueCount(2000) + .assertComplete(); } @Test