Skip to content

Commit

Permalink
2.x: add full implementation for Single.flatMapPublisher so doesn't b…
Browse files Browse the repository at this point in the history
…atch requests (ReactiveX#6015)
  • Loading branch information
davidmoten committed May 26, 2018
1 parent 07e126f commit e5244e9
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 1 deletion.
3 changes: 2 additions & 1 deletion src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -2460,7 +2460,8 @@ public final <R> Maybe<R> flatMapMaybe(final Function<? super T, ? extends Maybe
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Flowable<R> flatMapPublisher(Function<? super T, ? extends Publisher<? extends R>> mapper) {
return toFlowable().flatMap(mapper);
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleFlatMapPublisher<T, R>(this, mapper));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.single;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.Flowable;
import io.reactivex.FlowableSubscriber;
import io.reactivex.Scheduler;
import io.reactivex.SingleObserver;
import io.reactivex.SingleSource;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;

/**
* A Flowable that emits items based on applying a specified function to the item emitted by the
* source Single, where that function returns a Publisher.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMapPublisher.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer
* and the {@code Publisher} returned by the mapper function is expected to honor it as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapPublisher} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the source value type
* @param <R> the result value type
*
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since 2.1.15
*/
public final class SingleFlatMapPublisher<T, R> extends Flowable<R> {

final SingleSource<T> source;
final Function<? super T, ? extends Publisher<? extends R>> mapper;

public SingleFlatMapPublisher(SingleSource<T> source,
Function<? super T, ? extends Publisher<? extends R>> mapper) {
this.source = source;
this.mapper = mapper;
}

@Override
protected void subscribeActual(Subscriber<? super R> actual) {
source.subscribe(new SingleFlatMapPublisherObserver<T, R>(actual, mapper));
}

static final class SingleFlatMapPublisherObserver<S, T> extends AtomicLong
implements SingleObserver<S>, FlowableSubscriber<T>, Subscription {

private static final long serialVersionUID = 7759721921468635667L;

final Subscriber<? super T> actual;
final Function<? super S, ? extends Publisher<? extends T>> mapper;
final AtomicReference<Subscription> parent;
Disposable disposable;

SingleFlatMapPublisherObserver(Subscriber<? super T> actual,
Function<? super S, ? extends Publisher<? extends T>> mapper) {
this.actual = actual;
this.mapper = mapper;
this.parent = new AtomicReference<Subscription>();
}

@Override
public void onSubscribe(Disposable d) {
this.disposable = d;
actual.onSubscribe(this);
}

@Override
public void onSuccess(S value) {
Publisher<? extends T> f;
try {
f = ObjectHelper.requireNonNull(mapper.apply(value), "the mapper returned a null Publisher");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
return;
}
f.subscribe(this);
}

@Override
public void onSubscribe(Subscription s) {
SubscriptionHelper.deferredSetOnce(parent, this, s);
}

@Override
public void onNext(T t) {
actual.onNext(t);
}

@Override
public void onComplete() {
actual.onComplete();
}

@Override
public void onError(Throwable e) {
actual.onError(e);
}

@Override
public void request(long n) {
SubscriptionHelper.deferredRequest(parent, this, n);
}

@Override
public void cancel() {
disposable.dispose();
SubscriptionHelper.cancel(parent);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@

import static org.junit.Assert.*;

import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;
import org.reactivestreams.Publisher;

import io.reactivex.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
import io.reactivex.subscribers.TestSubscriber;

public class SingleFlatMapTest {

Expand Down Expand Up @@ -126,6 +129,92 @@ public Publisher<Integer> apply(Integer v) throws Exception {
.test()
.assertResult(1, 2, 3, 4, 5);
}

@Test(expected = NullPointerException.class)
public void flatMapPublisherMapperNull() {
Single.just(1).flatMapPublisher(null);
}

@Test
public void flatMapPublisherMapperThrows() {
final TestException ex = new TestException();
Single.just(1)
.flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer v) throws Exception {
throw ex;
}
})
.test()
.assertNoValues()
.assertError(ex);
}

@Test
public void flatMapPublisherSingleError() {
final TestException ex = new TestException();
Single.<Integer>error(ex)
.flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer v) throws Exception {
return Flowable.just(1);
}
})
.test()
.assertNoValues()
.assertError(ex);
}

@Test
public void flatMapPublisherCancelDuringSingle() {
final AtomicBoolean disposed = new AtomicBoolean();
TestSubscriber<Integer> ts = Single.<Integer>never()
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
disposed.set(true);
}
})
.flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer v) throws Exception {
return Flowable.range(v, 5);
}
})
.test()
.assertNoValues()
.assertNotTerminated();
assertFalse(disposed.get());
ts.cancel();
assertTrue(disposed.get());
ts.assertNotTerminated();
}

@Test
public void flatMapPublisherCancelDuringFlowable() {
final AtomicBoolean disposed = new AtomicBoolean();
TestSubscriber<Integer> ts =
Single.just(1)
.flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer v) throws Exception {
return Flowable.<Integer>never()
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
disposed.set(true);
}
});
}
})
.test()
.assertNoValues()
.assertNotTerminated();
assertFalse(disposed.get());
ts.cancel();
assertTrue(disposed.get());
ts.assertNotTerminated();
}

@Test(expected = NullPointerException.class)
public void flatMapNull() {
Expand Down

0 comments on commit e5244e9

Please sign in to comment.