Skip to content

Commit

Permalink
Single.concat(Publisher) defer subscribe to Publisher until reque…
Browse files Browse the repository at this point in the history
…sted (#1643)

Motivation:

Current version of `Single.concat(Publisher)` subscribes to the next
`Publisher` as soon as the `Single` completes. If the passed `Publisher`
does not support multiple subscribes, users can not apply retries if the
further processing of the single result fails.

Modifications:

- Add `Single.concat(Publisher, boolean)` overload that tells if we
need to subscribe to the next `Publisher` asap or defer subscribe until
more items are requested;
- Implement another `ConcatDeferNextSubscriber` variant that defers
subscribe to the next publisher, share common code in
`AbstractConcatSubscriber`;
- Enhance tests for new operator variant;

Result:

If processing of the first item failed and no more items were requested,
it's safe to apply retry operator even with a `Publisher` that do not
support re-subscribe.
  • Loading branch information
idelpivnitskiy authored and bondolo committed Jul 2, 2021
1 parent f615202 commit ea2f56c
Show file tree
Hide file tree
Showing 5 changed files with 562 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public final <E extends Throwable> Single<T> onErrorReturn(
public final Single<T> onErrorReturn(Predicate<? super Throwable> predicate,
Function<? super Throwable, ? extends T> itemSupplier) {
requireNonNull(itemSupplier);
return onErrorResume(predicate, t -> Single.succeeded(itemSupplier.apply(t)));
return onErrorResume(predicate, t -> succeeded(itemSupplier.apply(t)));
}

/**
Expand Down Expand Up @@ -858,6 +858,10 @@ public final Single<T> concat(Completable next) {
* elements from {@code next} {@link Publisher}. Any error emitted by this {@link Single} or {@code next}
* {@link Publisher} is forwarded to the returned {@link Publisher}.
* <p>
* Note: this method is an overload for {@link #concat(Publisher, boolean)} with {@code deferSubscribe} equal to
* {@code false}, which triggers subscribe to the {@code next} {@link Publisher} as soon as {@code this}
* {@link Single} completes successfully.
* <p>
* This method provides a means to sequence the execution of two asynchronous sources and in sequential programming
* is similar to:
* <pre>{@code
Expand All @@ -869,9 +873,37 @@ public final Single<T> concat(Completable next) {
* @param next {@link Publisher} to concat.
* @return New {@link Publisher} that first emits the result of this {@link Single} and then subscribes and emits
* all elements from {@code next} {@link Publisher}.
* @see #concat(Publisher, boolean)
*/
public final Publisher<T> concat(Publisher<? extends T> next) {
return new SingleConcatWithPublisher<>(this, next, executor);
return new SingleConcatWithPublisher<>(this, next, false, executor);
}

/**
* Returns a {@link Publisher} that first emits the result of this {@link Single} and then subscribes and emits all
* elements from {@code next} {@link Publisher}. Any error emitted by this {@link Single} or {@code next}
* {@link Publisher} is forwarded to the returned {@link Publisher}.
* <p>
* This method provides a means to sequence the execution of two asynchronous sources and in sequential programming
* is similar to:
* <pre>{@code
* List<T> results = new ...;
* results.add(resultOfThisSingle());
* results.addAll(nextStream());
* return results;
* }</pre>
* @param next {@link Publisher} to concat.
* @param deferSubscribe if {@code true} subscribe to the {@code next} {@link Publisher} will be deferred until
* demand is received. Otherwise, it subscribes to the {@code next} {@link Publisher} as soon as {@code this}
* {@link Single} completes successfully. Choosing the deferred ({@code true}) behavior is important if the
* {@code next} {@link Publisher} does not or might not support multiple subscribers (non-replayable). Choosing the
* immediate subscribe ({@code false}) behavior may have better performance and may be a preferable choice for
* replayable {@link Publisher}(s) or when eager subscribe is beneficial.
* @return New {@link Publisher} that first emits the result of this {@link Single} and then subscribes and emits
* all elements from {@code next} {@link Publisher}.
*/
public final Publisher<T> concat(Publisher<? extends T> next, boolean deferSubscribe) {
return new SingleConcatWithPublisher<>(this, next, deferSubscribe, executor);
}

/**
Expand Down
Loading

0 comments on commit ea2f56c

Please sign in to comment.