Skip to content

Commit

Permalink
Single#concat(Publisher) onNext error propagation (#1615)
Browse files Browse the repository at this point in the history
Motivation:
Single#concat(Publisher) doesn't catch exceptions from downstream
Subscriber#onNext and relies upon upstream to propagate exceptions.
However upstream is a Single and has already terminated by the time the
exception is caught so it cannot send a duplicate terminal signal.

Modifications:
- Single#concat(Publisher) catches exceptions from downstream
  Subscriber#onNext and propagates an error.

Result:
Single#concat(Publisher) is more robust with respect to exception
handling.
  • Loading branch information
Scottmitch authored Jun 11, 2021
1 parent 4f7110b commit 1ef82dd
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ public void cancel() {
}

private void emitSingleSuccessToTarget(@Nullable final T result) {
target.onNext(result);
try {
target.onNext(result);
} catch (Throwable cause) {
target.onError(cause);
return;
}
next.subscribeInternal(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ void setUp() {
subscriber.awaitSubscription();
}

@Test
void onNextErrorPropagated() {
subscriber = new TestPublisherSubscriber<>();
source = new TestSingle<>();
toSource(source.concat(next).<Integer>map(x -> {
throw DELIBERATE_EXCEPTION;
})).subscribe(subscriber);
subscriber.awaitSubscription().request(1);
source.onSuccess(1);
assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
assertThat(next.isSubscribed(), is(false));
}

@Test
void bothCompletion() {
triggerNextSubscribe();
Expand Down

0 comments on commit 1ef82dd

Please sign in to comment.