From 340bed9cde1525228cc27b32b22e52dbfd9ed080 Mon Sep 17 00:00:00 2001 From: Mathieu Gabriel Date: Tue, 18 Jul 2017 09:32:45 +0200 Subject: [PATCH] FlowableWithLatestFrom forgets request (#5494) * FlowableWithLatestFrom forgets request * Add unit test and correct testBackpressure * Revert indentation change * Revert import change --- .../flowable/FlowableWithLatestFrom.java | 2 + .../flowable/FlowableWithLatestFromTest.java | 37 ++++++++++++++++++- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java index 4448a7f038..6782e4282e 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java @@ -81,6 +81,8 @@ public void onNext(T t) { return; } actual.onNext(r); + } else{ + request(1); } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java index b4bb8afcde..c3dc8bbaa4 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java @@ -261,7 +261,7 @@ public void testNoDownstreamUnsubscribe() { @Test public void testBackpressure() { - Flowable source = Flowable.range(1, 10); + PublishProcessor source = PublishProcessor.create(); PublishProcessor other = PublishProcessor.create(); Flowable result = source.withLatestFrom(other, COMBINER); @@ -274,17 +274,24 @@ public void testBackpressure() { ts.request(1); + source.onNext(1); + assertTrue("Other has no observers!", other.hasSubscribers()); ts.assertNoValues(); other.onNext(1); - ts.request(1); + source.onNext(2); ts.assertValue((2 << 8) + 1); ts.request(5); + source.onNext(3); + source.onNext(4); + source.onNext(5); + source.onNext(6); + source.onNext(7); ts.assertValues( (2 << 8) + 1, (3 << 8) + 1, (4 << 8) + 1, (5 << 8) + 1, (6 << 8) + 1, (7 << 8) + 1 @@ -717,4 +724,30 @@ public void zeroOtherCombinerReturnsNull() { .test() .assertFailureAndMessage(NullPointerException.class, "The combiner returned a null value"); } + + @Test + public void testSingleRequestNotForgottenWhenNoData() { + PublishProcessor source = PublishProcessor.create(); + PublishProcessor other = PublishProcessor.create(); + + Flowable result = source.withLatestFrom(other, COMBINER); + + TestSubscriber ts = new TestSubscriber(0L); + + result.subscribe(ts); + + ts.request(1); + + source.onNext(1); + + ts.assertNoValues(); + + other.onNext(1); + + ts.assertNoValues(); + + source.onNext(2); + + ts.assertValue((2 << 8) + 1); + } }