From d3ed2690d5d7840b2ed190032f9324aec9a7d8a9 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Fri, 2 Mar 2018 09:56:40 +0100 Subject: [PATCH] 2.x: Add Flowable.switchMap{Maybe,Single}{DelayError} operators (#5873) --- src/main/java/io/reactivex/Flowable.java | 140 ++++ .../mixed/FlowableSwitchMapMaybe.java | 303 ++++++++ .../mixed/FlowableSwitchMapSingle.java | 292 ++++++++ .../mixed/FlowableSwitchMapMaybeTest.java | 649 ++++++++++++++++++ .../mixed/FlowableSwitchMapSingleTest.java | 606 ++++++++++++++++ 5 files changed, 1990 insertions(+) create mode 100644 src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapMaybe.java create mode 100644 src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingle.java create mode 100644 src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapMaybeTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingleTest.java diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 02c84ed989..0e63ea2f1a 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -14648,6 +14648,146 @@ Flowable switchMap0(Function> return RxJavaPlugins.onAssembly(new FlowableSwitchMap(this, mapper, bufferSize, delayError)); } + /** + * Maps the upstream items into {@link MaybeSource}s and switches (subscribes) to the newer ones + * while disposing the older ones (and ignoring their signals) and emits the latest success value of the current one if + * available while failing immediately if this {@code Flowable} or any of the + * active inner {@code MaybeSource}s fail. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream. The main {@code Flowable} is consumed in an + * unbounded manner (i.e., without backpressure).
+ *
Scheduler:
+ *
{@code switchMapMaybe} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
This operator terminates with an {@code onError} if this {@code Flowable} or any of + * the inner {@code MaybeSource}s fail while they are active. When this happens concurrently, their + * individual {@code Throwable} errors may get combined and emitted as a single + * {@link io.reactivex.exceptions.CompositeException CompositeException}. Otherwise, a late + * (i.e., inactive or switched out) {@code onError} from this {@code Flowable} or from any of + * the inner {@code MaybeSource}s will be forwarded to the global error handler via + * {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} as + * {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}
+ *
+ * @param the output value type + * @param mapper the function called with the current upstream event and should + * return a {@code MaybeSource} to replace the current active inner source + * and get subscribed to. + * @return the new Flowable instance + * @since 2.1.11 - experimental + * @see #switchMapMaybe(Function) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Flowable switchMapMaybe(@NonNull Function> mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new FlowableSwitchMapMaybe(this, mapper, false)); + } + + /** + * Maps the upstream items into {@link MaybeSource}s and switches (subscribes) to the newer ones + * while disposing the older ones (and ignoring their signals) and emits the latest success value of the current one if + * available, delaying errors from this {@code Flowable} or the inner {@code MaybeSource}s until all terminate. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream. The main {@code Flowable} is consumed in an + * unbounded manner (i.e., without backpressure).
+ *
Scheduler:
+ *
{@code switchMapMaybeDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the output value type + * @param mapper the function called with the current upstream event and should + * return a {@code MaybeSource} to replace the current active inner source + * and get subscribed to. + * @return the new Flowable instance + * @since 2.1.11 - experimental + * @see #switchMapMaybe(Function) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Flowable switchMapMaybeDelayError(@NonNull Function> mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new FlowableSwitchMapMaybe(this, mapper, true)); + } + + /** + * Maps the upstream items into {@link SingleSource}s and switches (subscribes) to the newer ones + * while disposing the older ones (and ignoring their signals) and emits the latest success value of the current one + * while failing immediately if this {@code Flowable} or any of the + * active inner {@code SingleSource}s fail. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream. The main {@code Flowable} is consumed in an + * unbounded manner (i.e., without backpressure).
+ *
Scheduler:
+ *
{@code switchMapSingle} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
This operator terminates with an {@code onError} if this {@code Flowable} or any of + * the inner {@code SingleSource}s fail while they are active. When this happens concurrently, their + * individual {@code Throwable} errors may get combined and emitted as a single + * {@link io.reactivex.exceptions.CompositeException CompositeException}. Otherwise, a late + * (i.e., inactive or switched out) {@code onError} from this {@code Flowable} or from any of + * the inner {@code SingleSource}s will be forwarded to the global error handler via + * {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} as + * {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}
+ *
+ * @param the output value type + * @param mapper the function called with the current upstream event and should + * return a {@code SingleSource} to replace the current active inner source + * and get subscribed to. + * @return the new Flowable instance + * @since 2.1.11 - experimental + * @see #switchMapSingle(Function) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Flowable switchMapSingle(@NonNull Function> mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new FlowableSwitchMapSingle(this, mapper, false)); + } + + /** + * Maps the upstream items into {@link SingleSource}s and switches (subscribes) to the newer ones + * while disposing the older ones (and ignoring their signals) and emits the latest success value of the current one, + * delaying errors from this {@code Flowable} or the inner {@code SingleSource}s until all terminate. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream. The main {@code Flowable} is consumed in an + * unbounded manner (i.e., without backpressure).
+ *
Scheduler:
+ *
{@code switchMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the output value type + * @param mapper the function called with the current upstream event and should + * return a {@code SingleSource} to replace the current active inner source + * and get subscribed to. + * @return the new Flowable instance + * @since 2.1.11 - experimental + * @see #switchMapSingle(Function) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Flowable switchMapSingleDelayError(@NonNull Function> mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new FlowableSwitchMapSingle(this, mapper, true)); + } + /** * Returns a Flowable that emits only the first {@code count} items emitted by the source Publisher. If the source emits fewer than * {@code count} items then all of its items are emitted. diff --git a/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapMaybe.java b/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapMaybe.java new file mode 100644 index 0000000000..95b0436321 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapMaybe.java @@ -0,0 +1,303 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps the upstream items into {@link MaybeSource}s and switches (subscribes) to the newer ones + * while disposing the older ones and emits the latest success value if available, optionally delaying + * errors from the main source or the inner sources. + * + * @param the upstream value type + * @param the downstream value type + * @since 2.1.11 - experimental + */ +@Experimental +public final class FlowableSwitchMapMaybe extends Flowable { + + final Flowable source; + + final Function> mapper; + + final boolean delayErrors; + + public FlowableSwitchMapMaybe(Flowable source, + Function> mapper, + boolean delayErrors) { + this.source = source; + this.mapper = mapper; + this.delayErrors = delayErrors; + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new SwitchMapMaybeSubscriber(s, mapper, delayErrors)); + } + + static final class SwitchMapMaybeSubscriber extends AtomicInteger + implements FlowableSubscriber, Subscription { + + private static final long serialVersionUID = -5402190102429853762L; + + final Subscriber downstream; + + final Function> mapper; + + final boolean delayErrors; + + final AtomicThrowable errors; + + final AtomicLong requested; + + final AtomicReference> inner; + + static final SwitchMapMaybeObserver INNER_DISPOSED = + new SwitchMapMaybeObserver(null); + + Subscription upstream; + + volatile boolean done; + + volatile boolean cancelled; + + long emitted; + + SwitchMapMaybeSubscriber(Subscriber downstream, + Function> mapper, + boolean delayErrors) { + this.downstream = downstream; + this.mapper = mapper; + this.delayErrors = delayErrors; + this.errors = new AtomicThrowable(); + this.requested = new AtomicLong(); + this.inner = new AtomicReference>(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(upstream, s)) { + upstream = s; + downstream.onSubscribe(this); + s.request(Long.MAX_VALUE); + } + } + + @Override + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void onNext(T t) { + SwitchMapMaybeObserver current = inner.get(); + if (current != null) { + current.dispose(); + } + + MaybeSource ms; + + try { + ms = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null MaybeSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.cancel(); + inner.getAndSet((SwitchMapMaybeObserver)INNER_DISPOSED); + onError(ex); + return; + } + + SwitchMapMaybeObserver observer = new SwitchMapMaybeObserver(this); + + for (;;) { + current = inner.get(); + if (current == INNER_DISPOSED) { + break; + } + if (inner.compareAndSet(current, observer)) { + ms.subscribe(observer); + break; + } + } + } + + @Override + public void onError(Throwable t) { + if (errors.addThrowable(t)) { + if (!delayErrors) { + disposeInner(); + } + done = true; + drain(); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + done = true; + drain(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + void disposeInner() { + SwitchMapMaybeObserver current = inner.getAndSet((SwitchMapMaybeObserver)INNER_DISPOSED); + if (current != null && current != INNER_DISPOSED) { + current.dispose(); + } + } + + @Override + public void request(long n) { + BackpressureHelper.add(requested, n); + drain(); + } + + @Override + public void cancel() { + cancelled = true; + upstream.cancel(); + disposeInner(); + } + + void innerError(SwitchMapMaybeObserver sender, Throwable ex) { + if (inner.compareAndSet(sender, null)) { + if (errors.addThrowable(ex)) { + if (!delayErrors) { + upstream.cancel(); + disposeInner(); + } + drain(); + return; + } + } + RxJavaPlugins.onError(ex); + } + + void innerComplete(SwitchMapMaybeObserver sender) { + if (inner.compareAndSet(sender, null)) { + drain(); + } + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + int missed = 1; + Subscriber downstream = this.downstream; + AtomicThrowable errors = this.errors; + AtomicReference> inner = this.inner; + AtomicLong requested = this.requested; + long emitted = this.emitted; + + for (;;) { + + for (;;) { + if (cancelled) { + return; + } + + if (errors.get() != null) { + if (!delayErrors) { + Throwable ex = errors.terminate(); + downstream.onError(ex); + return; + } + } + + boolean d = done; + SwitchMapMaybeObserver current = inner.get(); + boolean empty = current == null; + + if (d && empty) { + Throwable ex = errors.terminate(); + if (ex != null) { + downstream.onError(ex); + } else { + downstream.onComplete(); + } + return; + } + + if (empty || current.item == null || emitted == requested.get()) { + break; + } + + inner.compareAndSet(current, null); + + downstream.onNext(current.item); + + emitted++; + } + + this.emitted = emitted; + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + static final class SwitchMapMaybeObserver + extends AtomicReference implements MaybeObserver { + + private static final long serialVersionUID = 8042919737683345351L; + + final SwitchMapMaybeSubscriber parent; + + volatile R item; + + SwitchMapMaybeObserver(SwitchMapMaybeSubscriber parent) { + this.parent = parent; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onSuccess(R t) { + item = t; + parent.drain(); + } + + @Override + public void onError(Throwable e) { + parent.innerError(this, e); + } + + @Override + public void onComplete() { + parent.innerComplete(this); + } + + void dispose() { + DisposableHelper.dispose(this); + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingle.java b/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingle.java new file mode 100644 index 0000000000..813f779902 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingle.java @@ -0,0 +1,292 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps the upstream items into {@link SingleSource}s and switches (subscribes) to the newer ones + * while disposing the older ones and emits the latest success value, optionally delaying + * errors from the main source or the inner sources. + * + * @param the upstream value type + * @param the downstream value type + * @since 2.1.11 - experimental + */ +@Experimental +public final class FlowableSwitchMapSingle extends Flowable { + + final Flowable source; + + final Function> mapper; + + final boolean delayErrors; + + public FlowableSwitchMapSingle(Flowable source, + Function> mapper, + boolean delayErrors) { + this.source = source; + this.mapper = mapper; + this.delayErrors = delayErrors; + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new SwitchMapSingleSubscriber(s, mapper, delayErrors)); + } + + static final class SwitchMapSingleSubscriber extends AtomicInteger + implements FlowableSubscriber, Subscription { + + private static final long serialVersionUID = -5402190102429853762L; + + final Subscriber downstream; + + final Function> mapper; + + final boolean delayErrors; + + final AtomicThrowable errors; + + final AtomicLong requested; + + final AtomicReference> inner; + + static final SwitchMapSingleObserver INNER_DISPOSED = + new SwitchMapSingleObserver(null); + + Subscription upstream; + + volatile boolean done; + + volatile boolean cancelled; + + long emitted; + + SwitchMapSingleSubscriber(Subscriber downstream, + Function> mapper, + boolean delayErrors) { + this.downstream = downstream; + this.mapper = mapper; + this.delayErrors = delayErrors; + this.errors = new AtomicThrowable(); + this.requested = new AtomicLong(); + this.inner = new AtomicReference>(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(upstream, s)) { + upstream = s; + downstream.onSubscribe(this); + s.request(Long.MAX_VALUE); + } + } + + @Override + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void onNext(T t) { + SwitchMapSingleObserver current = inner.get(); + if (current != null) { + current.dispose(); + } + + SingleSource ms; + + try { + ms = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null SingleSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.cancel(); + inner.getAndSet((SwitchMapSingleObserver)INNER_DISPOSED); + onError(ex); + return; + } + + SwitchMapSingleObserver observer = new SwitchMapSingleObserver(this); + + for (;;) { + current = inner.get(); + if (current == INNER_DISPOSED) { + break; + } + if (inner.compareAndSet(current, observer)) { + ms.subscribe(observer); + break; + } + } + } + + @Override + public void onError(Throwable t) { + if (errors.addThrowable(t)) { + if (!delayErrors) { + disposeInner(); + } + done = true; + drain(); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + done = true; + drain(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + void disposeInner() { + SwitchMapSingleObserver current = inner.getAndSet((SwitchMapSingleObserver)INNER_DISPOSED); + if (current != null && current != INNER_DISPOSED) { + current.dispose(); + } + } + + @Override + public void request(long n) { + BackpressureHelper.add(requested, n); + drain(); + } + + @Override + public void cancel() { + cancelled = true; + upstream.cancel(); + disposeInner(); + } + + void innerError(SwitchMapSingleObserver sender, Throwable ex) { + if (inner.compareAndSet(sender, null)) { + if (errors.addThrowable(ex)) { + if (!delayErrors) { + upstream.cancel(); + disposeInner(); + } + drain(); + return; + } + } + RxJavaPlugins.onError(ex); + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + int missed = 1; + Subscriber downstream = this.downstream; + AtomicThrowable errors = this.errors; + AtomicReference> inner = this.inner; + AtomicLong requested = this.requested; + long emitted = this.emitted; + + for (;;) { + + for (;;) { + if (cancelled) { + return; + } + + if (errors.get() != null) { + if (!delayErrors) { + Throwable ex = errors.terminate(); + downstream.onError(ex); + return; + } + } + + boolean d = done; + SwitchMapSingleObserver current = inner.get(); + boolean empty = current == null; + + if (d && empty) { + Throwable ex = errors.terminate(); + if (ex != null) { + downstream.onError(ex); + } else { + downstream.onComplete(); + } + return; + } + + if (empty || current.item == null || emitted == requested.get()) { + break; + } + + inner.compareAndSet(current, null); + + downstream.onNext(current.item); + + emitted++; + } + + this.emitted = emitted; + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + static final class SwitchMapSingleObserver + extends AtomicReference implements SingleObserver { + + private static final long serialVersionUID = 8042919737683345351L; + + final SwitchMapSingleSubscriber parent; + + volatile R item; + + SwitchMapSingleObserver(SwitchMapSingleSubscriber parent) { + this.parent = parent; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onSuccess(R t) { + item = t; + parent.drain(); + } + + @Override + public void onError(Throwable e) { + parent.innerError(this, e); + } + + void dispose() { + DisposableHelper.dispose(this); + } + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapMaybeTest.java new file mode 100644 index 0000000000..d080928047 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapMaybeTest.java @@ -0,0 +1,649 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.subjects.MaybeSubject; +import io.reactivex.subscribers.TestSubscriber; + +public class FlowableSwitchMapMaybeTest { + + @Test + public void simple() { + Flowable.range(1, 5) + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void simpleEmpty() { + Flowable.range(1, 5) + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.empty(); + } + }) + .test() + .assertResult(); + } + + @Test + public void simpleMixed() { + Flowable.range(1, 10) + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v % 2 == 0) { + return Maybe.just(v); + } + return Maybe.empty(); + } + }) + .test() + .assertResult(2, 4, 6, 8, 10); + } + + @Test + public void backpressured() { + TestSubscriber ts = Flowable.range(1, 1024) + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v % 2 == 0) { + return Maybe.just(v); + } + return Maybe.empty(); + } + }) + .test(0L); + + // backpressure results items skipped + ts + .requestMore(1) + .assertResult(1024); + } + + @Test + public void mainError() { + Flowable.error(new TestException()) + .switchMapMaybe(Functions.justFunction(Maybe.never())) + .test() + .assertFailure(TestException.class); + } + + @Test + public void innerError() { + Flowable.just(1) + .switchMapMaybe(Functions.justFunction(Maybe.error(new TestException()))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) + throws Exception { + return f + .switchMapMaybe(Functions.justFunction(Maybe.never())); + } + } + ); + } + + @Test + public void limit() { + Flowable.range(1, 5) + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }) + .limit(3) + .test() + .assertResult(1, 2, 3); + } + + @Test + public void switchOver() { + PublishProcessor pp = PublishProcessor.create(); + + final MaybeSubject ms1 = MaybeSubject.create(); + final MaybeSubject ms2 = MaybeSubject.create(); + + TestSubscriber ts = pp.switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms1; + } + return ms2; + } + }).test(); + + ts.assertEmpty(); + + pp.onNext(1); + + ts.assertEmpty(); + + assertTrue(ms1.hasObservers()); + + pp.onNext(2); + + assertFalse(ms1.hasObservers()); + assertTrue(ms2.hasObservers()); + + ms2.onError(new TestException()); + + assertFalse(pp.hasSubscribers()); + + ts.assertFailure(TestException.class); + } + + @Test + public void switchOverDelayError() { + PublishProcessor pp = PublishProcessor.create(); + + final MaybeSubject ms1 = MaybeSubject.create(); + final MaybeSubject ms2 = MaybeSubject.create(); + + TestSubscriber ts = pp.switchMapMaybeDelayError(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms1; + } + return ms2; + } + }).test(); + + ts.assertEmpty(); + + pp.onNext(1); + + ts.assertEmpty(); + + assertTrue(ms1.hasObservers()); + + pp.onNext(2); + + assertFalse(ms1.hasObservers()); + assertTrue(ms2.hasObservers()); + + ms2.onError(new TestException()); + + ts.assertEmpty(); + + assertTrue(pp.hasSubscribers()); + + pp.onComplete(); + + ts.assertFailure(TestException.class); + } + + @Test + public void mainErrorInnerCompleteDelayError() { + PublishProcessor pp = PublishProcessor.create(); + + final MaybeSubject ms = MaybeSubject.create(); + + TestSubscriber ts = pp.switchMapMaybeDelayError(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return ms; + } + }).test(); + + ts.assertEmpty(); + + pp.onNext(1); + + ts.assertEmpty(); + + assertTrue(ms.hasObservers()); + + pp.onError(new TestException()); + + assertTrue(ms.hasObservers()); + + ts.assertEmpty(); + + ms.onComplete(); + + ts.assertFailure(TestException.class); + } + + @Test + public void mainErrorInnerSuccessDelayError() { + PublishProcessor pp = PublishProcessor.create(); + + final MaybeSubject ms = MaybeSubject.create(); + + TestSubscriber ts = pp.switchMapMaybeDelayError(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return ms; + } + }).test(); + + ts.assertEmpty(); + + pp.onNext(1); + + ts.assertEmpty(); + + assertTrue(ms.hasObservers()); + + pp.onError(new TestException()); + + assertTrue(ms.hasObservers()); + + ts.assertEmpty(); + + ms.onSuccess(1); + + ts.assertFailure(TestException.class, 1); + } + + @Test + public void mapperCrash() { + Flowable.just(1) + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void disposeBeforeSwitchInOnNext() { + final TestSubscriber ts = new TestSubscriber(); + + Flowable.just(1) + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + ts.cancel(); + return Maybe.just(1); + } + }).subscribe(ts); + + ts.assertEmpty(); + } + + @Test + public void disposeOnNextAfterFirst() { + final TestSubscriber ts = new TestSubscriber(); + + Flowable.just(1, 2) + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v == 2) { + ts.cancel(); + } + return Maybe.just(1); + } + }).subscribe(ts); + + ts.assertValue(1) + .assertNoErrors() + .assertNotComplete(); + } + + @Test + public void cancel() { + PublishProcessor pp = PublishProcessor.create(); + + final MaybeSubject ms = MaybeSubject.create(); + + TestSubscriber ts = pp.switchMapMaybeDelayError(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return ms; + } + }).test(); + + ts.assertEmpty(); + + pp.onNext(1); + + ts.assertEmpty(); + + assertTrue(pp.hasSubscribers()); + assertTrue(ms.hasObservers()); + + ts.cancel(); + + assertFalse(pp.hasSubscribers()); + assertFalse(ms.hasObservers()); + } + + @Test + public void mainErrorAfterTermination() { + List errors = TestHelper.trackPluginErrors(); + try { + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onError(new TestException("outer")); + } + } + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.error(new TestException("inner")); + } + }) + .test() + .assertFailureAndMessage(TestException.class, "inner"); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "outer"); + } finally { + RxJavaPlugins.reset(); + } + } + + + @Test + public void innerErrorAfterTermination() { + List errors = TestHelper.trackPluginErrors(); + try { + final AtomicReference> moRef = new AtomicReference>(); + + TestSubscriber ts = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onError(new TestException("outer")); + } + } + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return new Maybe() { + @Override + protected void subscribeActual( + MaybeObserver observer) { + observer.onSubscribe(Disposables.empty()); + moRef.set(observer); + } + }; + } + }) + .test(); + + ts.assertFailureAndMessage(TestException.class, "outer"); + + moRef.get().onError(new TestException("inner")); + moRef.get().onComplete(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "inner"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void nextCancelRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + final PublishProcessor pp = PublishProcessor.create(); + + final MaybeSubject ms = MaybeSubject.create(); + + final TestSubscriber ts = pp.switchMapMaybeDelayError(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return ms; + } + }).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertNoErrors() + .assertNotComplete(); + } + } + + @Test + public void nextInnerErrorRace() { + final TestException ex = new TestException(); + + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp = PublishProcessor.create(); + + final MaybeSubject ms = MaybeSubject.create(); + + final TestSubscriber ts = pp.switchMapMaybeDelayError(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms; + } + return Maybe.never(); + } + }).test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(2); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ms.onError(ex); + } + }; + + TestHelper.race(r1, r2); + + if (ts.errorCount() != 0) { + assertTrue(errors.isEmpty()); + ts.assertFailure(TestException.class); + } else if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void mainErrorInnerErrorRace() { + final TestException ex = new TestException(); + final TestException ex2 = new TestException(); + + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp = PublishProcessor.create(); + + final MaybeSubject ms = MaybeSubject.create(); + + final TestSubscriber ts = pp.switchMapMaybeDelayError(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms; + } + return Maybe.never(); + } + }).test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ms.onError(ex2); + } + }; + + TestHelper.race(r1, r2); + + ts.assertError(new Predicate() { + @Override + public boolean test(Throwable e) throws Exception { + return e instanceof TestException || e instanceof CompositeException; + } + }); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void nextInnerSuccessRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + final PublishProcessor pp = PublishProcessor.create(); + + final MaybeSubject ms = MaybeSubject.create(); + + final TestSubscriber ts = pp.switchMapMaybeDelayError(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms; + } + return Maybe.empty(); + } + }).test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(2); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ms.onSuccess(3); + } + }; + + TestHelper.race(r1, r2); + + ts.assertNoErrors() + .assertNotComplete(); + } + } + + @Test + public void requestMoreOnNext() { + TestSubscriber ts = new TestSubscriber(1) { + @Override + public void onNext(Integer t) { + super.onNext(t); + requestMore(1); + } + }; + Flowable.range(1, 5) + .switchMapMaybe(Functions.justFunction(Maybe.just(1))) + .subscribe(ts); + + ts.assertResult(1, 1, 1, 1, 1); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingleTest.java new file mode 100644 index 0000000000..3c6c832b76 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingleTest.java @@ -0,0 +1,606 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.subjects.SingleSubject; +import io.reactivex.subscribers.TestSubscriber; + +public class FlowableSwitchMapSingleTest { + + @Test + public void simple() { + Flowable.range(1, 5) + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return Single.just(v); + } + }) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void mainError() { + Flowable.error(new TestException()) + .switchMapSingle(Functions.justFunction(Single.never())) + .test() + .assertFailure(TestException.class); + } + + @Test + public void innerError() { + Flowable.just(1) + .switchMapSingle(Functions.justFunction(Single.error(new TestException()))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) + throws Exception { + return f + .switchMapSingle(Functions.justFunction(Single.never())); + } + } + ); + } + + @Test + public void limit() { + Flowable.range(1, 5) + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return Single.just(v); + } + }) + .limit(3) + .test() + .assertResult(1, 2, 3); + } + + @Test + public void switchOver() { + PublishProcessor pp = PublishProcessor.create(); + + final SingleSubject ms1 = SingleSubject.create(); + final SingleSubject ms2 = SingleSubject.create(); + + TestSubscriber ts = pp.switchMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms1; + } + return ms2; + } + }).test(); + + ts.assertEmpty(); + + pp.onNext(1); + + ts.assertEmpty(); + + assertTrue(ms1.hasObservers()); + + pp.onNext(2); + + assertFalse(ms1.hasObservers()); + assertTrue(ms2.hasObservers()); + + ms2.onError(new TestException()); + + assertFalse(pp.hasSubscribers()); + + ts.assertFailure(TestException.class); + } + + @Test + public void switchOverDelayError() { + PublishProcessor pp = PublishProcessor.create(); + + final SingleSubject ms1 = SingleSubject.create(); + final SingleSubject ms2 = SingleSubject.create(); + + TestSubscriber ts = pp.switchMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms1; + } + return ms2; + } + }).test(); + + ts.assertEmpty(); + + pp.onNext(1); + + ts.assertEmpty(); + + assertTrue(ms1.hasObservers()); + + pp.onNext(2); + + assertFalse(ms1.hasObservers()); + assertTrue(ms2.hasObservers()); + + ms2.onError(new TestException()); + + ts.assertEmpty(); + + assertTrue(pp.hasSubscribers()); + + pp.onComplete(); + + ts.assertFailure(TestException.class); + } + + @Test + public void mainErrorInnerCompleteDelayError() { + PublishProcessor pp = PublishProcessor.create(); + + final SingleSubject ms = SingleSubject.create(); + + TestSubscriber ts = pp.switchMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return ms; + } + }).test(); + + ts.assertEmpty(); + + pp.onNext(1); + + ts.assertEmpty(); + + assertTrue(ms.hasObservers()); + + pp.onError(new TestException()); + + assertTrue(ms.hasObservers()); + + ts.assertEmpty(); + + ms.onSuccess(1); + + ts.assertFailure(TestException.class, 1); + } + + @Test + public void mainErrorInnerSuccessDelayError() { + PublishProcessor pp = PublishProcessor.create(); + + final SingleSubject ms = SingleSubject.create(); + + TestSubscriber ts = pp.switchMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return ms; + } + }).test(); + + ts.assertEmpty(); + + pp.onNext(1); + + ts.assertEmpty(); + + assertTrue(ms.hasObservers()); + + pp.onError(new TestException()); + + assertTrue(ms.hasObservers()); + + ts.assertEmpty(); + + ms.onSuccess(1); + + ts.assertFailure(TestException.class, 1); + } + + @Test + public void mapperCrash() { + Flowable.just(1) + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void disposeBeforeSwitchInOnNext() { + final TestSubscriber ts = new TestSubscriber(); + + Flowable.just(1) + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + ts.cancel(); + return Single.just(1); + } + }).subscribe(ts); + + ts.assertEmpty(); + } + + @Test + public void disposeOnNextAfterFirst() { + final TestSubscriber ts = new TestSubscriber(); + + Flowable.just(1, 2) + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + if (v == 2) { + ts.cancel(); + } + return Single.just(1); + } + }).subscribe(ts); + + ts.assertValue(1) + .assertNoErrors() + .assertNotComplete(); + } + + @Test + public void cancel() { + PublishProcessor pp = PublishProcessor.create(); + + final SingleSubject ms = SingleSubject.create(); + + TestSubscriber ts = pp.switchMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return ms; + } + }).test(); + + ts.assertEmpty(); + + pp.onNext(1); + + ts.assertEmpty(); + + assertTrue(pp.hasSubscribers()); + assertTrue(ms.hasObservers()); + + ts.cancel(); + + assertFalse(pp.hasSubscribers()); + assertFalse(ms.hasObservers()); + } + + @Test + public void mainErrorAfterTermination() { + List errors = TestHelper.trackPluginErrors(); + try { + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onError(new TestException("outer")); + } + } + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return Single.error(new TestException("inner")); + } + }) + .test() + .assertFailureAndMessage(TestException.class, "inner"); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "outer"); + } finally { + RxJavaPlugins.reset(); + } + } + + + @Test + public void innerErrorAfterTermination() { + List errors = TestHelper.trackPluginErrors(); + try { + final AtomicReference> moRef = new AtomicReference>(); + + TestSubscriber ts = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onError(new TestException("outer")); + } + } + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return new Single() { + @Override + protected void subscribeActual( + SingleObserver observer) { + observer.onSubscribe(Disposables.empty()); + moRef.set(observer); + } + }; + } + }) + .test(); + + ts.assertFailureAndMessage(TestException.class, "outer"); + + moRef.get().onError(new TestException("inner")); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "inner"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void nextCancelRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + final PublishProcessor pp = PublishProcessor.create(); + + final SingleSubject ms = SingleSubject.create(); + + final TestSubscriber ts = pp.switchMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return ms; + } + }).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertNoErrors() + .assertNotComplete(); + } + } + + @Test + public void nextInnerErrorRace() { + final TestException ex = new TestException(); + + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp = PublishProcessor.create(); + + final SingleSubject ms = SingleSubject.create(); + + final TestSubscriber ts = pp.switchMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms; + } + return Single.never(); + } + }).test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(2); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ms.onError(ex); + } + }; + + TestHelper.race(r1, r2); + + if (ts.errorCount() != 0) { + assertTrue(errors.isEmpty()); + ts.assertFailure(TestException.class); + } else if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void mainErrorInnerErrorRace() { + final TestException ex = new TestException(); + final TestException ex2 = new TestException(); + + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp = PublishProcessor.create(); + + final SingleSubject ms = SingleSubject.create(); + + final TestSubscriber ts = pp.switchMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms; + } + return Single.never(); + } + }).test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ms.onError(ex2); + } + }; + + TestHelper.race(r1, r2); + + ts.assertError(new Predicate() { + @Override + public boolean test(Throwable e) throws Exception { + return e instanceof TestException || e instanceof CompositeException; + } + }); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void nextInnerSuccessRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + final PublishProcessor pp = PublishProcessor.create(); + + final SingleSubject ms = SingleSubject.create(); + + final TestSubscriber ts = pp.switchMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms; + } + return Single.never(); + } + }).test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(2); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ms.onSuccess(3); + } + }; + + TestHelper.race(r1, r2); + + ts.assertNoErrors() + .assertNotComplete(); + } + } + + @Test + public void requestMoreOnNext() { + TestSubscriber ts = new TestSubscriber(1) { + @Override + public void onNext(Integer t) { + super.onNext(t); + requestMore(1); + } + }; + Flowable.range(1, 5) + .switchMapSingle(Functions.justFunction(Single.just(1))) + .subscribe(ts); + + ts.assertResult(1, 1, 1, 1, 1); + } + + @Test + public void backpressured() { + Flowable.just(1) + .switchMapSingle(Functions.justFunction(Single.just(1))) + .test(0) + .assertEmpty() + .requestMore(1) + .assertResult(1); + } +}