Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add missing {Maybe|Single}.mergeDelayError variants #5799

Merged
merged 2 commits into from
Jan 9, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -1180,12 +1180,12 @@ public static <T> Flowable<T> mergeDelayError(Iterable<? extends MaybeSource<? e

/**
* Flattens a Publisher that emits MaybeSources into one Publisher, in a way that allows a Subscriber to
* receive all successfully emitted items from all of the source Publishers without being interrupted by
* an error notification from one of them.
* receive all successfully emitted items from all of the source MaybeSources without being interrupted by
* an error notification from one of them or even the main Publisher.
* <p>
* This behaves like {@link #merge(Publisher)} except that if any of the merged Publishers notify of an
* This behaves like {@link #merge(Publisher)} except that if any of the merged MaybeSources notify of an
* error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that
* error notification until all of the merged Publishers have finished emitting items.
* error notification until all of the merged MaybeSources and the main Publisher have finished emitting items.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
* <p>
Expand Down Expand Up @@ -1214,6 +1214,46 @@ public static <T> Flowable<T> mergeDelayError(Publisher<? extends MaybeSource<?
return Flowable.fromPublisher(sources).flatMap((Function)MaybeToPublisher.instance(), true);
}


/**
* Flattens a Publisher that emits MaybeSources into one Publisher, in a way that allows a Subscriber to
* receive all successfully emitted items from all of the source MaybeSources without being interrupted by
* an error notification from one of them or even the main Publisher as well as limiting the total number of active MaybeSources.
* <p>
* This behaves like {@link #merge(Publisher, int)} except that if any of the merged MaybeSources notify of an
* error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that
* error notification until all of the merged MaybeSources and the main Publisher have finished emitting items.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
* <p>
* Even if multiple merged Publishers send {@code onError} notifications, {@code mergeDelayError} will only
* invoke the {@code onError} method of its Subscribers once.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. The outer {@code Publisher} is consumed
* in unbounded mode (i.e., no backpressure is applied to it).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common element base type
* @param sources
* a Publisher that emits MaybeSources
* @param maxConcurrency the maximum number of active inner MaybeSources to be merged at a time
* @return a Flowable that emits all of the items emitted by the Publishers emitted by the
* {@code source} Publisher
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @since 2.1.9 - experimental
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public static <T> Flowable<T> mergeDelayError(Publisher<? extends MaybeSource<? extends T>> sources, int maxConcurrency) {
return Flowable.fromPublisher(sources).flatMap((Function)MaybeToPublisher.instance(), true, maxConcurrency);
}

/**
* Flattens two MaybeSources into one Flowable, in a way that allows a Subscriber to receive all
* successfully emitted items from each of the source MaybeSources without being interrupted by an error
Expand Down
177 changes: 176 additions & 1 deletion src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ public static <T> Flowable<T> merge(
* {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors. Similarly, {@code Throwable}s
* signaled by source(s) after the returned {@code Flowable} has been cancelled or terminated with a
* (composite) error will be sent to the same global error handler.
* Use {@link #mergeDelayError(SingleSource, SingleSource, SingleSource)} to merge sources and terminate only when all source {@code SingleSource}s
* Use {@link #mergeDelayError(SingleSource, SingleSource, SingleSource, SingleSource)} to merge sources and terminate only when all source {@code SingleSource}s
* have completed or failed with an error.
* </dd>
* </dl>
Expand Down Expand Up @@ -913,6 +913,181 @@ public static <T> Flowable<T> merge(
return merge(Flowable.fromArray(source1, source2, source3, source4));
}


/**
* Merges an Iterable sequence of SingleSource instances into a single Flowable sequence,
* running all SingleSources at once and delaying any error(s) until all sources succeed or fail.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the common and resulting value type
* @param sources the Iterable sequence of SingleSource sources
* @return the new Flowable instance
* @since 2.1.9 - experimental
* @see #merge(Iterable)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public static <T> Flowable<T> mergeDelayError(Iterable<? extends SingleSource<? extends T>> sources) {
return mergeDelayError(Flowable.fromIterable(sources));
}

/**
* Merges a Flowable sequence of SingleSource instances into a single Flowable sequence,
* running all SingleSources at once and delaying any error(s) until all sources succeed or fail.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the common and resulting value type
* @param sources the Flowable sequence of SingleSource sources
* @return the new Flowable instance
* @see #merge(Publisher)
* @since 2.1.9 - experimental
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "unchecked", "rawtypes" })
@Experimental
public static <T> Flowable<T> mergeDelayError(Publisher<? extends SingleSource<? extends T>> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, SingleInternalHelper.toFlowable(), true, Integer.MAX_VALUE, Flowable.bufferSize()));
}


/**
* Flattens two Singles into a single Flowable, without any transformation, delaying
* any error(s) until all sources succeed or fail.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.png" alt="">
* <p>
* You can combine items emitted by multiple Singles so that they appear as a single Flowable, by
* using the {@code mergeDelayError} method.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common value type
* @param source1
* a Single to be merged
* @param source2
* a Single to be merged
* @return a Flowable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @see #merge(SingleSource, SingleSource)
* @since 2.1.9 - experimental
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
@Experimental
public static <T> Flowable<T> mergeDelayError(
SingleSource<? extends T> source1, SingleSource<? extends T> source2
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
return mergeDelayError(Flowable.fromArray(source1, source2));
}

/**
* Flattens three Singles into a single Flowable, without any transformation, delaying
* any error(s) until all sources succeed or fail.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.png" alt="">
* <p>
* You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using
* the {@code mergeDelayError} method.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common value type
* @param source1
* a Single to be merged
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically it's a SingleSource. dunno whether you want to distinguish between them

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update it as Maybe.merge() say "a MaybeSource to be merged".

* @param source2
* a Single to be merged
* @param source3
* a Single to be merged
* @return a Flowable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @see #merge(SingleSource, SingleSource, SingleSource)
* @since 2.1.9 - experimental
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
@Experimental
public static <T> Flowable<T> mergeDelayError(
SingleSource<? extends T> source1, SingleSource<? extends T> source2,
SingleSource<? extends T> source3
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
return mergeDelayError(Flowable.fromArray(source1, source2, source3));
}

/**
* Flattens four Singles into a single Flowable, without any transformation, delaying
* any error(s) until all sources succeed or fail.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.png" alt="">
* <p>
* You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using
* the {@code mergeDelayError} method.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common value type
* @param source1
* a Single to be merged
* @param source2
* a Single to be merged
* @param source3
* a Single to be merged
* @param source4
* a Single to be merged
* @return a Flowable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @see #merge(SingleSource, SingleSource, SingleSource, SingleSource)
* @since 2.1.9 - experimental
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
@Experimental
public static <T> Flowable<T> mergeDelayError(
SingleSource<? extends T> source1, SingleSource<? extends T> source2,
SingleSource<? extends T> source3, SingleSource<? extends T> source4
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
ObjectHelper.requireNonNull(source4, "source4 is null");
return mergeDelayError(Flowable.fromArray(source1, source2, source3, source4));
}

/**
* Returns a singleton instance of a never-signalling Single (only calls onSubscribe).
* <dl>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import io.reactivex.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.schedulers.Schedulers;

public class MaybeMergeTest {

@Test
public void delayErrorWithMaxConcurrency() {
Maybe.mergeDelayError(
Flowable.just(Maybe.just(1), Maybe.just(2), Maybe.just(3)), 1)
.test()
.assertResult(1, 2, 3);
}

@Test
public void delayErrorWithMaxConcurrencyError() {
Maybe.mergeDelayError(
Flowable.just(Maybe.just(1), Maybe.<Integer>error(new TestException()), Maybe.just(3)), 1)
.test()
.assertFailure(TestException.class, 1, 3);
}

@Test
public void delayErrorWithMaxConcurrencyAsync() {
final AtomicInteger count = new AtomicInteger();
@SuppressWarnings("unchecked")
Maybe<Integer>[] sources = new Maybe[3];
for (int i = 0; i < 3; i++) {
final int j = i + 1;
sources[i] = Maybe.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return count.incrementAndGet() - j;
}
})
.subscribeOn(Schedulers.io());
}

for (int i = 0; i < 1000; i++) {
count.set(0);
Maybe.mergeDelayError(
Flowable.fromArray(sources), 1)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(0, 0, 0);
}
}

@Test
public void delayErrorWithMaxConcurrencyAsyncError() {
final AtomicInteger count = new AtomicInteger();
@SuppressWarnings("unchecked")
Maybe<Integer>[] sources = new Maybe[3];
for (int i = 0; i < 3; i++) {
final int j = i + 1;
sources[i] = Maybe.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return count.incrementAndGet() - j;
}
})
.subscribeOn(Schedulers.io());
}
sources[1] = Maybe.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
throw new TestException("" + count.incrementAndGet());
}
})
.subscribeOn(Schedulers.io());

for (int i = 0; i < 1000; i++) {
count.set(0);
Maybe.mergeDelayError(
Flowable.fromArray(sources), 1)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailureAndMessage(TestException.class, "2", 0, 0);
}
}
}
Loading