diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Completable.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Completable.java index 62e0e2ba21..be47c1d05d 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Completable.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Completable.java @@ -1771,6 +1771,21 @@ public final Cancellable subscribe(Runnable onComplete) { return subscriber; } + /** + * Subscribe to this {@link Completable}, invoke passed {@link Runnable} when this {@link Completable} terminates + * successfully or emit an error to {@link Consumer} when it fails. + * + * @param onComplete {@link Runnable} to invoke when this {@link Completable} terminates successfully. + * @param errorConsumer {@link Consumer} to accept the error when this {@link Completable} fails. + * @return {@link Cancellable} used to invoke {@link Cancellable#cancel()} on the parameter of + * {@link Subscriber#onSubscribe(Cancellable)} for this {@link Completable}. + */ + public final Cancellable subscribe(Runnable onComplete, Consumer errorConsumer) { + SimpleCompletableSubscriber subscriber = new SimpleCompletableSubscriber(onComplete, errorConsumer); + subscribeInternal(subscriber); + return subscriber; + } + /** * Handles a subscriber to this {@code Completable}. *

diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SimpleCompletableSubscriber.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SimpleCompletableSubscriber.java index 09096ef229..998208d942 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SimpleCompletableSubscriber.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SimpleCompletableSubscriber.java @@ -22,13 +22,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.function.Consumer; +import javax.annotation.Nullable; + import static java.util.Objects.requireNonNull; final class SimpleCompletableSubscriber extends SequentialCancellable implements CompletableSource.Subscriber { - private static final Runnable NOOP_RUNNABLE = () -> { }; private static final Logger LOGGER = LoggerFactory.getLogger(SimpleCompletableSubscriber.class); + private static final Runnable NOOP_RUNNABLE = () -> { }; private final Runnable onComplete; + @Nullable + private final Consumer errorConsumer; SimpleCompletableSubscriber() { this(NOOP_RUNNABLE); @@ -36,6 +41,13 @@ final class SimpleCompletableSubscriber extends SequentialCancellable implements SimpleCompletableSubscriber(final Runnable onComplete) { this.onComplete = requireNonNull(onComplete); + this.errorConsumer = null; + } + + SimpleCompletableSubscriber(final Runnable onComplete, + final Consumer errorConsumer) { + this.onComplete = requireNonNull(onComplete); + this.errorConsumer = requireNonNull(errorConsumer); } @Override @@ -54,6 +66,10 @@ public void onComplete() { @Override public void onError(Throwable t) { - LOGGER.debug("Received exception from the source.", t); + if (errorConsumer != null) { + errorConsumer.accept(t); + } else { + LOGGER.debug("Received exception from the source.", t); + } } } diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SimpleSingleSubscriber.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SimpleSingleSubscriber.java index eb20e9647a..57f4bc9eee 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SimpleSingleSubscriber.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SimpleSingleSubscriber.java @@ -32,9 +32,18 @@ final class SimpleSingleSubscriber extends SequentialCancellable implements S private static final Logger LOGGER = LoggerFactory.getLogger(SimpleSingleSubscriber.class); private final Consumer resultConsumer; + @Nullable + private final Consumer errorConsumer; - SimpleSingleSubscriber(Consumer resultConsumer) { + SimpleSingleSubscriber(final Consumer resultConsumer) { this.resultConsumer = requireNonNull(resultConsumer); + this.errorConsumer = null; + } + + SimpleSingleSubscriber(final Consumer resultConsumer, + final Consumer errorConsumer) { + this.resultConsumer = requireNonNull(resultConsumer); + this.errorConsumer = requireNonNull(errorConsumer); } @Override @@ -44,11 +53,19 @@ public void onSubscribe(Cancellable cancellable) { @Override public void onSuccess(@Nullable T result) { - resultConsumer.accept(result); + try { + resultConsumer.accept(result); + } catch (Throwable t) { + LOGGER.debug("Received exception from the result consumer {}.", resultConsumer, t); + } } @Override public void onError(Throwable t) { - LOGGER.debug("Received exception from the source.", t); + if (errorConsumer != null) { + errorConsumer.accept(t); + } else { + LOGGER.debug("Received exception from the source.", t); + } } } diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java index 8b733b58bc..7a8f6b1eb8 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java @@ -1879,11 +1879,10 @@ protected final void subscribeInternal(Subscriber subscriber) { } /** - * Subscribe to this {@link Single}, emits the result to the passed {@link Consumer} and log any - * {@link Subscriber#onError(Throwable)}. - * - * @param resultConsumer {@link Consumer} to accept the result of this {@link Single}. + * Subscribe to this {@link Single}, emit the result to the passed {@link Consumer} and log any + * {@link Subscriber#onError(Throwable)} at debug level. * + * @param resultConsumer {@link Consumer} to accept the result of this {@link Single} if it succeeds. * @return {@link Cancellable} used to invoke {@link Cancellable#cancel()} on the parameter of * {@link Subscriber#onSubscribe(Cancellable)} for this {@link Single}. */ @@ -1893,6 +1892,21 @@ public final Cancellable subscribe(Consumer resultConsumer) { return subscriber; } + /** + * Subscribe to this {@link Single}, emit the result or error to one of the passed {@link Consumer}s. + * + * @param resultConsumer {@link Consumer} to accept the result of this {@link Single} if it succeeds. + * @param errorConsumer {@link Consumer} to accept the error of this {@link Single} if it fails. + * @return {@link Cancellable} used to invoke {@link Cancellable#cancel()} on the parameter of + * {@link Subscriber#onSubscribe(Cancellable)} for this {@link Single}. + */ + public final Cancellable subscribe(Consumer resultConsumer, + Consumer errorConsumer) { + SimpleSingleSubscriber subscriber = new SimpleSingleSubscriber<>(resultConsumer, errorConsumer); + subscribeInternal(subscriber); + return subscriber; + } + /** * Handles a subscriber to this {@link Single}. *