Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add Flowable.concatMapCompletable{DelayError} operator #5871

Merged
merged 1 commit into from
Mar 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 163 additions & 0 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.reactivex.internal.functions.*;
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.operators.flowable.*;
import io.reactivex.internal.operators.mixed.*;
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.internal.subscribers.*;
Expand Down Expand Up @@ -6886,6 +6887,168 @@ public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<?
return RxJavaPlugins.onAssembly(new FlowableConcatMap<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE));
}

/**
* Maps the upstream intems into {@link CompletableSource}s and subscribes to them one after the
* other completes.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator expects the upstream to support backpressure. If this {@code Flowable} violates the rule, the operator will
* signal a {@code MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param mapper the function called with the upstream item and should return
* a {@code CompletableSource} to become the next source to
* be subscribed to
* @return a new Completable instance
* @since 2.1.11 - experimental
* @see #concatMapCompletableDelayError(Function)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
@Experimental
public final Completable concatMapCompletable(Function<? super T, ? extends CompletableSource> mapper) {
return concatMapCompletable(mapper, 2);
}

/**
* Maps the upstream intems into {@link CompletableSource}s and subscribes to them one after the
* other completes.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator expects the upstream to support backpressure. If this {@code Flowable} violates the rule, the operator will
* signal a {@code MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param mapper the function called with the upstream item and should return
* a {@code CompletableSource} to become the next source to
* be subscribed to
* @param prefetch The number of upstream items to prefetch so that fresh items are
* ready to be mapped when a previous {@code CompletableSource} terminates.
* The operator replenishes after half of the prefetch amount has been consumed
* and turned into {@code CompletableSource}s.
* @return a new Completable instance
* @since 2.1.11 - experimental
* @see #concatMapCompletableDelayError(Function, boolean, int)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
@Experimental
public final Completable concatMapCompletable(Function<? super T, ? extends CompletableSource> mapper, int prefetch) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new FlowableConcatMapCompletable<T>(this, mapper, ErrorMode.IMMEDIATE, prefetch));
}

/**
* Maps the upstream intems into {@link CompletableSource}s and subscribes to them one after the
* other terminates, delaying all errors till both this {@code Flowable} and all
* inner {@code CompletableSource}s terminate.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator expects the upstream to support backpressure. If this {@code Flowable} violates the rule, the operator will
* signal a {@code MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMapCompletableDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param mapper the function called with the upstream item and should return
* a {@code CompletableSource} to become the next source to
* be subscribed to
* @return a new Completable instance
* @since 2.1.11 - experimental
* @see #concatMapCompletable(Function, int)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
@Experimental
public final Completable concatMapCompletableDelayError(Function<? super T, ? extends CompletableSource> mapper) {
return concatMapCompletableDelayError(mapper, true, 2);
}

/**
* Maps the upstream intems into {@link CompletableSource}s and subscribes to them one after the
* other terminates, optionally delaying all errors till both this {@code Flowable} and all
* inner {@code CompletableSource}s terminate.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator expects the upstream to support backpressure. If this {@code Flowable} violates the rule, the operator will
* signal a {@code MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMapCompletableDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param mapper the function called with the upstream item and should return
* a {@code CompletableSource} to become the next source to
* be subscribed to
* @param tillTheEnd If {@code true}, errors from this {@code Flowable} or any of the
* inner {@code CompletableSource}s are delayed until all
* of them terminate. If {@code false}, an error from this
* {@code Flowable} is delayed until the current inner
* {@code CompletableSource} terminates and only then is
* it emitted to the downstream.
* @return a new Completable instance
* @since 2.1.11 - experimental
* @see #concatMapCompletable(Function)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
@Experimental
public final Completable concatMapCompletableDelayError(Function<? super T, ? extends CompletableSource> mapper, boolean tillTheEnd) {
return concatMapCompletableDelayError(mapper, tillTheEnd, 2);
}

/**
* Maps the upstream intems into {@link CompletableSource}s and subscribes to them one after the
* other terminates, optionally delaying all errors till both this {@code Flowable} and all
* inner {@code CompletableSource}s terminate.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator expects the upstream to support backpressure. If this {@code Flowable} violates the rule, the operator will
* signal a {@code MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMapCompletableDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param mapper the function called with the upstream item and should return
* a {@code CompletableSource} to become the next source to
* be subscribed to
* @param tillTheEnd If {@code true}, errors from this {@code Flowable} or any of the
* inner {@code CompletableSource}s are delayed until all
* of them terminate. If {@code false}, an error from this
* {@code Flowable} is delayed until the current inner
* {@code CompletableSource} terminates and only then is
* it emitted to the downstream.
* @param prefetch The number of upstream items to prefetch so that fresh items are
* ready to be mapped when a previous {@code CompletableSource} terminates.
* The operator replenishes after half of the prefetch amount has been consumed
* and turned into {@code CompletableSource}s.
* @return a new Completable instance
* @since 2.1.11 - experimental
* @see #concatMapCompletable(Function, int)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
@Experimental
public final Completable concatMapCompletableDelayError(Function<? super T, ? extends CompletableSource> mapper, boolean tillTheEnd, int prefetch) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new FlowableConcatMapCompletable<T>(this, mapper, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY, prefetch));
}

/**
* Maps each of the items into a Publisher, subscribes to them one after the other,
* one at a time and emits their values in order
Expand Down
Loading