diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java index 99b8d86155..9401c202b0 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java @@ -600,7 +600,7 @@ public void onSubscribe(Subscription s) { if (s instanceof QueueSubscription) { @SuppressWarnings("unchecked") QueueSubscription qs = (QueueSubscription) s; - int m = qs.requestFusion(QueueSubscription.ANY); + int m = qs.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY); if (m == QueueSubscription.SYNC) { fusionMode = m; queue = qs; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableZip.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableZip.java index c64d3270e9..c01ea6ef20 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableZip.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableZip.java @@ -357,7 +357,7 @@ public void onSubscribe(Subscription s) { if (s instanceof QueueSubscription) { QueueSubscription f = (QueueSubscription) s; - int m = f.requestFusion(QueueSubscription.ANY); + int m = f.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY); if (m == QueueSubscription.SYNC) { sourceMode = m; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java index 606f810019..cb4ee05c60 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java @@ -532,7 +532,7 @@ public void onSubscribe(Disposable s) { @SuppressWarnings("unchecked") QueueDisposable qd = (QueueDisposable) s; - int m = qd.requestFusion(QueueDisposable.ANY); + int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); if (m == QueueDisposable.SYNC) { fusionMode = m; queue = qd; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java index ef9b503539..8a373dbe11 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.flowable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.util.*; @@ -897,4 +898,32 @@ public void scalarXMap() { .test() .assertResult(2); } + + @Test + public void noCrossBoundaryFusion() { + for (int i = 0; i < 500; i++) { + TestSubscriber ts = Flowable.merge( + Flowable.just(1).observeOn(Schedulers.single()).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return Thread.currentThread().getName().substring(0, 4); + } + }), + Flowable.just(1).observeOn(Schedulers.computation()).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return Thread.currentThread().getName().substring(0, 4); + } + }) + ) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(2); + + List list = ts.values(); + + assertTrue(list.toString(), list.contains("RxSi")); + assertTrue(list.toString(), list.contains("RxCo")); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableZipTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableZipTest.java index eb82ba96c2..b80d7dbd5e 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableZipTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableZipTest.java @@ -1767,4 +1767,38 @@ public Integer apply(Integer a, Integer b) throws Exception { .test(0L) .assertFailure(TestException.class); } + + @Test + public void noCrossBoundaryFusion() { + for (int i = 0; i < 500; i++) { + TestSubscriber> ts = Flowable.zip( + Flowable.just(1).observeOn(Schedulers.single()).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return Thread.currentThread().getName().substring(0, 4); + } + }), + Flowable.just(1).observeOn(Schedulers.computation()).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return Thread.currentThread().getName().substring(0, 4); + } + }), + new BiFunction>() { + @Override + public List apply(Object t1, Object t2) throws Exception { + return Arrays.asList(t1, t2); + } + } + ) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1); + + List list = ts.values().get(0); + + assertTrue(list.toString(), list.contains("RxSi")); + assertTrue(list.toString(), list.contains("RxCo")); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java index e31725db33..4cc8425efb 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java @@ -756,4 +756,32 @@ public Integer apply(Integer w) throws Exception { TestHelper.assertError(errors, 1, TestException.class); } + + @Test + public void noCrossBoundaryFusion() { + for (int i = 0; i < 500; i++) { + TestObserver ts = Observable.merge( + Observable.just(1).observeOn(Schedulers.single()).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return Thread.currentThread().getName().substring(0, 4); + } + }), + Observable.just(1).observeOn(Schedulers.computation()).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return Thread.currentThread().getName().substring(0, 4); + } + }) + ) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(2); + + List list = ts.values(); + + assertTrue(list.toString(), list.contains("RxSi")); + assertTrue(list.toString(), list.contains("RxCo")); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java index c304226ad4..73e5d936fe 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.util.*; @@ -999,8 +1000,8 @@ public Object apply(final Object[] args) { public void testDownstreamBackpressureRequestsWithFiniteSyncObservables() { AtomicInteger generatedA = new AtomicInteger(); AtomicInteger generatedB = new AtomicInteger(); - Observable o1 = createInfiniteObservable(generatedA).take(Flowable.bufferSize() * 2); - Observable o2 = createInfiniteObservable(generatedB).take(Flowable.bufferSize() * 2); + Observable o1 = createInfiniteObservable(generatedA).take(Observable.bufferSize() * 2); + Observable o2 = createInfiniteObservable(generatedB).take(Observable.bufferSize() * 2); TestObserver ts = new TestObserver(); Observable.zip(o1, o2, new BiFunction() { @@ -1010,14 +1011,14 @@ public String apply(Integer t1, Integer t2) { return t1 + "-" + t2; } - }).observeOn(Schedulers.computation()).take(Flowable.bufferSize() * 2).subscribe(ts); + }).observeOn(Schedulers.computation()).take(Observable.bufferSize() * 2).subscribe(ts); ts.awaitTerminalEvent(); ts.assertNoErrors(); - assertEquals(Flowable.bufferSize() * 2, ts.valueCount()); + assertEquals(Observable.bufferSize() * 2, ts.valueCount()); System.out.println("Generated => A: " + generatedA.get() + " B: " + generatedB.get()); - assertTrue(generatedA.get() < (Flowable.bufferSize() * 3)); - assertTrue(generatedB.get() < (Flowable.bufferSize() * 3)); + assertTrue(generatedA.get() < (Observable.bufferSize() * 3)); + assertTrue(generatedB.get() < (Observable.bufferSize() * 3)); } private Observable createInfiniteObservable(final AtomicInteger generated) { @@ -1358,4 +1359,37 @@ public Object apply(Integer a, Integer b) throws Exception { } })); } -} + + @Test + public void noCrossBoundaryFusion() { + for (int i = 0; i < 500; i++) { + TestObserver> ts = Observable.zip( + Observable.just(1).observeOn(Schedulers.single()).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return Thread.currentThread().getName().substring(0, 4); + } + }), + Observable.just(1).observeOn(Schedulers.computation()).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return Thread.currentThread().getName().substring(0, 4); + } + }), + new BiFunction>() { + @Override + public List apply(Object t1, Object t2) throws Exception { + return Arrays.asList(t1, t2); + } + } + ) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1); + + List list = ts.values().get(0); + + assertTrue(list.toString(), list.contains("RxSi")); + assertTrue(list.toString(), list.contains("RxCo")); + } + }}