diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java index 03ddb6c0ce..b7cbe1d3fc 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java @@ -359,7 +359,7 @@ public void onSubscribe(Subscription s) { @SuppressWarnings("unchecked") QueueSubscription qs = (QueueSubscription) s; - int m = qs.requestFusion(QueueSubscription.ANY); + int m = qs.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY); if (m == QueueSubscription.SYNC) { fusionMode = m; queue = qs; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java index 2d22639af6..032f84dbc6 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java @@ -348,7 +348,7 @@ public void onSubscribe(Disposable s) { @SuppressWarnings("unchecked") QueueDisposable qd = (QueueDisposable) s; - int m = qd.requestFusion(QueueDisposable.ANY); + int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); if (m == QueueDisposable.SYNC) { queue = qd; done = true; diff --git a/src/test/java/io/reactivex/TestHelper.java b/src/test/java/io/reactivex/TestHelper.java index 9e31194f0f..60a7fdfd81 100644 --- a/src/test/java/io/reactivex/TestHelper.java +++ b/src/test/java/io/reactivex/TestHelper.java @@ -2922,4 +2922,214 @@ public void request(long n) { } }; } + + static final class FlowableStripBoundary extends Flowable implements FlowableTransformer { + + final Flowable source; + + FlowableStripBoundary(Flowable source) { + this.source = source; + } + + @Override + public Flowable apply(Flowable upstream) { + return new FlowableStripBoundary(upstream); + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new StripBoundarySubscriber(s)); + } + + static final class StripBoundarySubscriber implements FlowableSubscriber, QueueSubscription { + + final Subscriber actual; + + Subscription upstream; + + QueueSubscription qs; + + StripBoundarySubscriber(Subscriber actual) { + this.actual = actual; + } + + @SuppressWarnings("unchecked") + @Override + public void onSubscribe(Subscription subscription) { + this.upstream = subscription; + if (subscription instanceof QueueSubscription) { + qs = (QueueSubscription)subscription; + } + actual.onSubscribe(this); + } + + @Override + public void onNext(T t) { + actual.onNext(t); + } + + @Override + public void onError(Throwable throwable) { + actual.onError(throwable); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + + @Override + public int requestFusion(int mode) { + QueueSubscription fs = qs; + if (fs != null) { + return fs.requestFusion(mode & ~BOUNDARY); + } + return NONE; + } + + @Override + public boolean offer(T value) { + throw new UnsupportedOperationException("Should not be called"); + } + + @Override + public boolean offer(T v1, T v2) { + throw new UnsupportedOperationException("Should not be called"); + } + + @Override + public T poll() throws Exception { + return qs.poll(); + } + + @Override + public void clear() { + qs.clear(); + } + + @Override + public boolean isEmpty() { + return qs.isEmpty(); + } + + @Override + public void request(long n) { + upstream.request(n); + } + + @Override + public void cancel() { + upstream.cancel(); + } + } + } + + public static FlowableTransformer flowableStripBoundary() { + return new FlowableStripBoundary(null); + } + + static final class ObservableStripBoundary extends Observable implements ObservableTransformer { + + final Observable source; + + ObservableStripBoundary(Observable source) { + this.source = source; + } + + @Override + public Observable apply(Observable upstream) { + return new ObservableStripBoundary(upstream); + } + + @Override + protected void subscribeActual(Observer s) { + source.subscribe(new StripBoundaryObserver(s)); + } + + static final class StripBoundaryObserver implements Observer, QueueDisposable { + + final Observer actual; + + Disposable upstream; + + QueueDisposable qd; + + StripBoundaryObserver(Observer actual) { + this.actual = actual; + } + + @SuppressWarnings("unchecked") + @Override + public void onSubscribe(Disposable d) { + this.upstream = d; + if (d instanceof QueueDisposable) { + qd = (QueueDisposable)d; + } + actual.onSubscribe(this); + } + + @Override + public void onNext(T t) { + actual.onNext(t); + } + + @Override + public void onError(Throwable throwable) { + actual.onError(throwable); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + + @Override + public int requestFusion(int mode) { + QueueDisposable fs = qd; + if (fs != null) { + return fs.requestFusion(mode & ~BOUNDARY); + } + return NONE; + } + + @Override + public boolean offer(T value) { + throw new UnsupportedOperationException("Should not be called"); + } + + @Override + public boolean offer(T v1, T v2) { + throw new UnsupportedOperationException("Should not be called"); + } + + @Override + public T poll() throws Exception { + return qd.poll(); + } + + @Override + public void clear() { + qd.clear(); + } + + @Override + public boolean isEmpty() { + return qd.isEmpty(); + } + + @Override + public void dispose() { + upstream.dispose(); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + } + } + + public static ObservableTransformer observableStripBoundary() { + return new ObservableStripBoundary(null); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSwitchTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSwitchTest.java index 8aeda04f47..28f599e487 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSwitchTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSwitchTest.java @@ -33,7 +33,7 @@ import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; -import io.reactivex.schedulers.TestScheduler; +import io.reactivex.schedulers.*; import io.reactivex.subscribers.*; public class FlowableSwitchTest { @@ -1144,12 +1144,16 @@ public void run() { @Test public void fusedInnerCrash() { Flowable.just(1).hide() - .switchMap(Functions.justFunction(Flowable.just(1).map(new Function() { - @Override - public Object apply(Integer v) throws Exception { - throw new TestException(); - } - }))) + .switchMap(Functions.justFunction(Flowable.just(1) + .map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .compose(TestHelper.flowableStripBoundary()) + ) + ) .test() .assertFailure(TestException.class); } @@ -1174,4 +1178,30 @@ public void innerCancelledOnMainError() { ts.assertFailure(TestException.class); } + + @Test + public void fusedBoundary() { + String thread = Thread.currentThread().getName(); + + Flowable.range(1, 10000) + .switchMap(new Function>() { + @Override + public Flowable apply(Integer v) + throws Exception { + return Flowable.just(2).hide() + .observeOn(Schedulers.single()) + .map(new Function() { + @Override + public Object apply(Integer w) throws Exception { + return Thread.currentThread().getName(); + } + }); + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertNever(thread) + .assertNoErrors() + .assertComplete(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java index 00fb200378..beae5a788c 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.util.List; @@ -26,15 +27,14 @@ import io.reactivex.*; import io.reactivex.disposables.*; import io.reactivex.exceptions.*; -import io.reactivex.functions.Consumer; -import io.reactivex.functions.Function; +import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.schedulers.ImmediateThinScheduler; import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.schedulers.TestScheduler; -import io.reactivex.subjects.PublishSubject; +import io.reactivex.schedulers.*; +import io.reactivex.subjects.*; public class ObservableSwitchTest { @@ -1121,6 +1121,7 @@ public Integer apply(Integer v) throws Exception { throw new TestException(); } }) + .compose(TestHelper.observableStripBoundary()) )) .test(); @@ -1148,6 +1149,7 @@ public Integer apply(Integer v) throws Exception { throw new TestException(); } }) + .compose(TestHelper.observableStripBoundary()) )) .test(); @@ -1166,4 +1168,30 @@ public Integer apply(Integer v) throws Exception { assertFalse(ps.hasObservers()); } + + @Test + public void fusedBoundary() { + String thread = Thread.currentThread().getName(); + + Observable.range(1, 10000) + .switchMap(new Function>() { + @Override + public ObservableSource apply(Integer v) + throws Exception { + return Observable.just(2).hide() + .observeOn(Schedulers.single()) + .map(new Function() { + @Override + public Object apply(Integer w) throws Exception { + return Thread.currentThread().getName(); + } + }); + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertNever(thread) + .assertNoErrors() + .assertComplete(); + } }