diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 6ecdb5409c..8fa075b680 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -2460,7 +2460,8 @@ public final Maybe flatMapMaybe(final Function Flowable flatMapPublisher(Function> mapper) { - return toFlowable().flatMap(mapper); + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new SingleFlatMapPublisher(this, mapper)); } /** diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapPublisher.java b/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapPublisher.java new file mode 100644 index 0000000000..bdb7d166cd --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapPublisher.java @@ -0,0 +1,137 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import io.reactivex.Flowable; +import io.reactivex.FlowableSubscriber; +import io.reactivex.Scheduler; +import io.reactivex.SingleObserver; +import io.reactivex.SingleSource; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.subscriptions.SubscriptionHelper; + +/** + * A Flowable that emits items based on applying a specified function to the item emitted by the + * source Single, where that function returns a Publisher. + *

+ * + *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer + * and the {@code Publisher} returned by the mapper function is expected to honor it as well.
+ *
Scheduler:
+ *
{@code flatMapPublisher} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the source value type + * @param the result value type + * + * @see ReactiveX operators documentation: FlatMap + * @since 2.1.15 + */ +public final class SingleFlatMapPublisher extends Flowable { + + final SingleSource source; + final Function> mapper; + + public SingleFlatMapPublisher(SingleSource source, + Function> mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + protected void subscribeActual(Subscriber actual) { + source.subscribe(new SingleFlatMapPublisherObserver(actual, mapper)); + } + + static final class SingleFlatMapPublisherObserver extends AtomicLong + implements SingleObserver, FlowableSubscriber, Subscription { + + private static final long serialVersionUID = 7759721921468635667L; + + final Subscriber actual; + final Function> mapper; + final AtomicReference parent; + Disposable disposable; + + SingleFlatMapPublisherObserver(Subscriber actual, + Function> mapper) { + this.actual = actual; + this.mapper = mapper; + this.parent = new AtomicReference(); + } + + @Override + public void onSubscribe(Disposable d) { + this.disposable = d; + actual.onSubscribe(this); + } + + @Override + public void onSuccess(S value) { + Publisher f; + try { + f = ObjectHelper.requireNonNull(mapper.apply(value), "the mapper returned a null Publisher"); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + actual.onError(e); + return; + } + f.subscribe(this); + } + + @Override + public void onSubscribe(Subscription s) { + SubscriptionHelper.deferredSetOnce(parent, this, s); + } + + @Override + public void onNext(T t) { + actual.onNext(t); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + + @Override + public void request(long n) { + SubscriptionHelper.deferredRequest(parent, this, n); + } + + @Override + public void cancel() { + disposable.dispose(); + SubscriptionHelper.cancel(parent); + } + } + +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapTest.java index c2b1ecea91..9c0d81e28d 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapTest.java @@ -15,12 +15,15 @@ import static org.junit.Assert.*; +import java.util.concurrent.atomic.AtomicBoolean; + import org.junit.Test; import org.reactivestreams.Publisher; import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.subscribers.TestSubscriber; public class SingleFlatMapTest { @@ -126,6 +129,92 @@ public Publisher apply(Integer v) throws Exception { .test() .assertResult(1, 2, 3, 4, 5); } + + @Test(expected = NullPointerException.class) + public void flatMapPublisherMapperNull() { + Single.just(1).flatMapPublisher(null); + } + + @Test + public void flatMapPublisherMapperThrows() { + final TestException ex = new TestException(); + Single.just(1) + .flatMapPublisher(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + throw ex; + } + }) + .test() + .assertNoValues() + .assertError(ex); + } + + @Test + public void flatMapPublisherSingleError() { + final TestException ex = new TestException(); + Single.error(ex) + .flatMapPublisher(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + return Flowable.just(1); + } + }) + .test() + .assertNoValues() + .assertError(ex); + } + + @Test + public void flatMapPublisherCancelDuringSingle() { + final AtomicBoolean disposed = new AtomicBoolean(); + TestSubscriber ts = Single.never() + .doOnDispose(new Action() { + @Override + public void run() throws Exception { + disposed.set(true); + } + }) + .flatMapPublisher(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + return Flowable.range(v, 5); + } + }) + .test() + .assertNoValues() + .assertNotTerminated(); + assertFalse(disposed.get()); + ts.cancel(); + assertTrue(disposed.get()); + ts.assertNotTerminated(); + } + + @Test + public void flatMapPublisherCancelDuringFlowable() { + final AtomicBoolean disposed = new AtomicBoolean(); + TestSubscriber ts = + Single.just(1) + .flatMapPublisher(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + return Flowable.never() + .doOnCancel(new Action() { + @Override + public void run() throws Exception { + disposed.set(true); + } + }); + } + }) + .test() + .assertNoValues() + .assertNotTerminated(); + assertFalse(disposed.get()); + ts.cancel(); + assertTrue(disposed.get()); + ts.assertNotTerminated(); + } @Test(expected = NullPointerException.class) public void flatMapNull() {