diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Completable.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Completable.java index 62e0e2ba21..be47c1d05d 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Completable.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Completable.java @@ -1771,6 +1771,21 @@ public final Cancellable subscribe(Runnable onComplete) { return subscriber; } + /** + * Subscribe to this {@link Completable}, invoke passed {@link Runnable} when this {@link Completable} terminates + * successfully or emit an error to {@link Consumer} when it fails. + * + * @param onComplete {@link Runnable} to invoke when this {@link Completable} terminates successfully. + * @param errorConsumer {@link Consumer} to accept the error when this {@link Completable} fails. + * @return {@link Cancellable} used to invoke {@link Cancellable#cancel()} on the parameter of + * {@link Subscriber#onSubscribe(Cancellable)} for this {@link Completable}. + */ + public final Cancellable subscribe(Runnable onComplete, Consumer super Throwable> errorConsumer) { + SimpleCompletableSubscriber subscriber = new SimpleCompletableSubscriber(onComplete, errorConsumer); + subscribeInternal(subscriber); + return subscriber; + } + /** * Handles a subscriber to this {@code Completable}. *
diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SimpleCompletableSubscriber.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SimpleCompletableSubscriber.java
index 09096ef229..998208d942 100644
--- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SimpleCompletableSubscriber.java
+++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SimpleCompletableSubscriber.java
@@ -22,13 +22,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+
import static java.util.Objects.requireNonNull;
final class SimpleCompletableSubscriber extends SequentialCancellable implements CompletableSource.Subscriber {
- private static final Runnable NOOP_RUNNABLE = () -> { };
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleCompletableSubscriber.class);
+ private static final Runnable NOOP_RUNNABLE = () -> { };
private final Runnable onComplete;
+ @Nullable
+ private final Consumer super Throwable> errorConsumer;
SimpleCompletableSubscriber() {
this(NOOP_RUNNABLE);
@@ -36,6 +41,13 @@ final class SimpleCompletableSubscriber extends SequentialCancellable implements
SimpleCompletableSubscriber(final Runnable onComplete) {
this.onComplete = requireNonNull(onComplete);
+ this.errorConsumer = null;
+ }
+
+ SimpleCompletableSubscriber(final Runnable onComplete,
+ final Consumer super Throwable> errorConsumer) {
+ this.onComplete = requireNonNull(onComplete);
+ this.errorConsumer = requireNonNull(errorConsumer);
}
@Override
@@ -54,6 +66,10 @@ public void onComplete() {
@Override
public void onError(Throwable t) {
- LOGGER.debug("Received exception from the source.", t);
+ if (errorConsumer != null) {
+ errorConsumer.accept(t);
+ } else {
+ LOGGER.debug("Received exception from the source.", t);
+ }
}
}
diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SimpleSingleSubscriber.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SimpleSingleSubscriber.java
index eb20e9647a..57f4bc9eee 100644
--- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SimpleSingleSubscriber.java
+++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SimpleSingleSubscriber.java
@@ -32,9 +32,18 @@ final class SimpleSingleSubscriber