Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add missing {Maybe|Single}.mergeDelayError variants #5799

Merged
merged 2 commits into from
Jan 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -1180,12 +1180,12 @@ public static <T> Flowable<T> mergeDelayError(Iterable<? extends MaybeSource<? e

/**
* Flattens a Publisher that emits MaybeSources into one Publisher, in a way that allows a Subscriber to
* receive all successfully emitted items from all of the source Publishers without being interrupted by
* an error notification from one of them.
* receive all successfully emitted items from all of the source MaybeSources without being interrupted by
* an error notification from one of them or even the main Publisher.
* <p>
* This behaves like {@link #merge(Publisher)} except that if any of the merged Publishers notify of an
* This behaves like {@link #merge(Publisher)} except that if any of the merged MaybeSources notify of an
* error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that
* error notification until all of the merged Publishers have finished emitting items.
* error notification until all of the merged MaybeSources and the main Publisher have finished emitting items.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
* <p>
Expand Down Expand Up @@ -1214,6 +1214,46 @@ public static <T> Flowable<T> mergeDelayError(Publisher<? extends MaybeSource<?
return Flowable.fromPublisher(sources).flatMap((Function)MaybeToPublisher.instance(), true);
}


/**
* Flattens a Publisher that emits MaybeSources into one Publisher, in a way that allows a Subscriber to
* receive all successfully emitted items from all of the source MaybeSources without being interrupted by
* an error notification from one of them or even the main Publisher as well as limiting the total number of active MaybeSources.
* <p>
* This behaves like {@link #merge(Publisher, int)} except that if any of the merged MaybeSources notify of an
* error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that
* error notification until all of the merged MaybeSources and the main Publisher have finished emitting items.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
* <p>
* Even if multiple merged Publishers send {@code onError} notifications, {@code mergeDelayError} will only
* invoke the {@code onError} method of its Subscribers once.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. The outer {@code Publisher} is consumed
* in unbounded mode (i.e., no backpressure is applied to it).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common element base type
* @param sources
* a Publisher that emits MaybeSources
* @param maxConcurrency the maximum number of active inner MaybeSources to be merged at a time
* @return a Flowable that emits all of the items emitted by the Publishers emitted by the
* {@code source} Publisher
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @since 2.1.9 - experimental
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public static <T> Flowable<T> mergeDelayError(Publisher<? extends MaybeSource<? extends T>> sources, int maxConcurrency) {
return Flowable.fromPublisher(sources).flatMap((Function)MaybeToPublisher.instance(), true, maxConcurrency);
}

/**
* Flattens two MaybeSources into one Flowable, in a way that allows a Subscriber to receive all
* successfully emitted items from each of the source MaybeSources without being interrupted by an error
Expand Down
197 changes: 186 additions & 11 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -787,9 +787,9 @@ public static <T> Single<T> merge(SingleSource<? extends SingleSource<? extends
*
* @param <T> the common value type
* @param source1
* a Single to be merged
* a SingleSource to be merged
* @param source2
* a Single to be merged
* a SingleSource to be merged
* @return a Flowable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @see #mergeDelayError(SingleSource, SingleSource)
Expand Down Expand Up @@ -835,11 +835,11 @@ public static <T> Flowable<T> merge(
*
* @param <T> the common value type
* @param source1
* a Single to be merged
* a SingleSource to be merged
* @param source2
* a Single to be merged
* a SingleSource to be merged
* @param source3
* a Single to be merged
* a SingleSource to be merged
* @return a Flowable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @see #mergeDelayError(SingleSource, SingleSource, SingleSource)
Expand Down Expand Up @@ -880,20 +880,20 @@ public static <T> Flowable<T> merge(
* {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors. Similarly, {@code Throwable}s
* signaled by source(s) after the returned {@code Flowable} has been cancelled or terminated with a
* (composite) error will be sent to the same global error handler.
* Use {@link #mergeDelayError(SingleSource, SingleSource, SingleSource)} to merge sources and terminate only when all source {@code SingleSource}s
* Use {@link #mergeDelayError(SingleSource, SingleSource, SingleSource, SingleSource)} to merge sources and terminate only when all source {@code SingleSource}s
* have completed or failed with an error.
* </dd>
* </dl>
*
* @param <T> the common value type
* @param source1
* a Single to be merged
* a SingleSource to be merged
* @param source2
* a Single to be merged
* a SingleSource to be merged
* @param source3
* a Single to be merged
* a SingleSource to be merged
* @param source4
* a Single to be merged
* a SingleSource to be merged
* @return a Flowable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @see #mergeDelayError(SingleSource, SingleSource, SingleSource, SingleSource)
Expand All @@ -913,6 +913,181 @@ public static <T> Flowable<T> merge(
return merge(Flowable.fromArray(source1, source2, source3, source4));
}


/**
* Merges an Iterable sequence of SingleSource instances into a single Flowable sequence,
* running all SingleSources at once and delaying any error(s) until all sources succeed or fail.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the common and resulting value type
* @param sources the Iterable sequence of SingleSource sources
* @return the new Flowable instance
* @since 2.1.9 - experimental
* @see #merge(Iterable)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public static <T> Flowable<T> mergeDelayError(Iterable<? extends SingleSource<? extends T>> sources) {
return mergeDelayError(Flowable.fromIterable(sources));
}

/**
* Merges a Flowable sequence of SingleSource instances into a single Flowable sequence,
* running all SingleSources at once and delaying any error(s) until all sources succeed or fail.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the common and resulting value type
* @param sources the Flowable sequence of SingleSource sources
* @return the new Flowable instance
* @see #merge(Publisher)
* @since 2.1.9 - experimental
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "unchecked", "rawtypes" })
@Experimental
public static <T> Flowable<T> mergeDelayError(Publisher<? extends SingleSource<? extends T>> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, SingleInternalHelper.toFlowable(), true, Integer.MAX_VALUE, Flowable.bufferSize()));
}


/**
* Flattens two Singles into a single Flowable, without any transformation, delaying
* any error(s) until all sources succeed or fail.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.png" alt="">
* <p>
* You can combine items emitted by multiple Singles so that they appear as a single Flowable, by
* using the {@code mergeDelayError} method.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common value type
* @param source1
* a SingleSource to be merged
* @param source2
* a SingleSource to be merged
* @return a Flowable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @see #merge(SingleSource, SingleSource)
* @since 2.1.9 - experimental
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
@Experimental
public static <T> Flowable<T> mergeDelayError(
SingleSource<? extends T> source1, SingleSource<? extends T> source2
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
return mergeDelayError(Flowable.fromArray(source1, source2));
}

/**
* Flattens three Singles into a single Flowable, without any transformation, delaying
* any error(s) until all sources succeed or fail.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.png" alt="">
* <p>
* You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using
* the {@code mergeDelayError} method.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common value type
* @param source1
* a SingleSource to be merged
* @param source2
* a SingleSource to be merged
* @param source3
* a SingleSource to be merged
* @return a Flowable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @see #merge(SingleSource, SingleSource, SingleSource)
* @since 2.1.9 - experimental
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
@Experimental
public static <T> Flowable<T> mergeDelayError(
SingleSource<? extends T> source1, SingleSource<? extends T> source2,
SingleSource<? extends T> source3
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
return mergeDelayError(Flowable.fromArray(source1, source2, source3));
}

/**
* Flattens four Singles into a single Flowable, without any transformation, delaying
* any error(s) until all sources succeed or fail.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.png" alt="">
* <p>
* You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using
* the {@code mergeDelayError} method.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common value type
* @param source1
* a SingleSource to be merged
* @param source2
* a SingleSource to be merged
* @param source3
* a SingleSource to be merged
* @param source4
* a SingleSource to be merged
* @return a Flowable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @see #merge(SingleSource, SingleSource, SingleSource, SingleSource)
* @since 2.1.9 - experimental
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
@Experimental
public static <T> Flowable<T> mergeDelayError(
SingleSource<? extends T> source1, SingleSource<? extends T> source2,
SingleSource<? extends T> source3, SingleSource<? extends T> source4
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
ObjectHelper.requireNonNull(source4, "source4 is null");
return mergeDelayError(Flowable.fromArray(source1, source2, source3, source4));
}

/**
* Returns a singleton instance of a never-signalling Single (only calls onSubscribe).
* <dl>
Expand Down Expand Up @@ -2417,7 +2592,7 @@ public final Single<Boolean> contains(final Object value, final BiPredicate<Obje
* </dl>
*
* @param other
* a Single to be merged
* a SingleSource to be merged
* @return that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
Expand Down
Loading