Skip to content

Commit

Permalink
Single/Completable subscribe overloads to handle failures (#3112)
Browse files Browse the repository at this point in the history
Motivation:

Currently, users can consume the result or execute a `Runnable` for
successful termination of a `Single`/`Completable`. Failures will be
logged at debug level. Users can only use `whenOnError` operator to
intercept the error, but it may result in duplicated logging or users
simply forget to handle the error case.

Modifications:

- Add `Single.subscribe(Consumer<? super T>, Consumer<? super Throwable>)`;
- Add `Completable.subscribe(Runnable, Consumer<? super Throwable>)`;
- Modify `SimpleSingleSubscriber/SimpleCompletableSubscriber` to handle
error when `errorConsumer` is provided.

Result:

Users can easily handle error cases when they subscribe to `Single` or
`Completable`.
  • Loading branch information
idelpivnitskiy authored Nov 18, 2024
1 parent d7dd886 commit 7a05d7d
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,21 @@ public final Cancellable subscribe(Runnable onComplete) {
return subscriber;
}

/**
* Subscribe to this {@link Completable}, invoke passed {@link Runnable} when this {@link Completable} terminates
* successfully or emit an error to {@link Consumer} when it fails.
*
* @param onComplete {@link Runnable} to invoke when this {@link Completable} terminates successfully.
* @param errorConsumer {@link Consumer} to accept the error when this {@link Completable} fails.
* @return {@link Cancellable} used to invoke {@link Cancellable#cancel()} on the parameter of
* {@link Subscriber#onSubscribe(Cancellable)} for this {@link Completable}.
*/
public final Cancellable subscribe(Runnable onComplete, Consumer<? super Throwable> errorConsumer) {
SimpleCompletableSubscriber subscriber = new SimpleCompletableSubscriber(onComplete, errorConsumer);
subscribeInternal(subscriber);
return subscriber;
}

/**
* Handles a subscriber to this {@code Completable}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,32 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.Consumer;
import javax.annotation.Nullable;

import static java.util.Objects.requireNonNull;

final class SimpleCompletableSubscriber extends SequentialCancellable implements CompletableSource.Subscriber {

private static final Runnable NOOP_RUNNABLE = () -> { };
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleCompletableSubscriber.class);
private static final Runnable NOOP_RUNNABLE = () -> { };
private final Runnable onComplete;
@Nullable
private final Consumer<? super Throwable> errorConsumer;

SimpleCompletableSubscriber() {
this(NOOP_RUNNABLE);
}

SimpleCompletableSubscriber(final Runnable onComplete) {
this.onComplete = requireNonNull(onComplete);
this.errorConsumer = null;
}

SimpleCompletableSubscriber(final Runnable onComplete,
final Consumer<? super Throwable> errorConsumer) {
this.onComplete = requireNonNull(onComplete);
this.errorConsumer = requireNonNull(errorConsumer);
}

@Override
Expand All @@ -54,6 +66,10 @@ public void onComplete() {

@Override
public void onError(Throwable t) {
LOGGER.debug("Received exception from the source.", t);
if (errorConsumer != null) {
errorConsumer.accept(t);
} else {
LOGGER.debug("Received exception from the source.", t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,18 @@ final class SimpleSingleSubscriber<T> extends SequentialCancellable implements S
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleSingleSubscriber.class);

private final Consumer<? super T> resultConsumer;
@Nullable
private final Consumer<? super Throwable> errorConsumer;

SimpleSingleSubscriber(Consumer<? super T> resultConsumer) {
SimpleSingleSubscriber(final Consumer<? super T> resultConsumer) {
this.resultConsumer = requireNonNull(resultConsumer);
this.errorConsumer = null;
}

SimpleSingleSubscriber(final Consumer<? super T> resultConsumer,
final Consumer<? super Throwable> errorConsumer) {
this.resultConsumer = requireNonNull(resultConsumer);
this.errorConsumer = requireNonNull(errorConsumer);
}

@Override
Expand All @@ -44,11 +53,19 @@ public void onSubscribe(Cancellable cancellable) {

@Override
public void onSuccess(@Nullable T result) {
resultConsumer.accept(result);
try {
resultConsumer.accept(result);
} catch (Throwable t) {
LOGGER.debug("Received exception from the result consumer {}.", resultConsumer, t);
}
}

@Override
public void onError(Throwable t) {
LOGGER.debug("Received exception from the source.", t);
if (errorConsumer != null) {
errorConsumer.accept(t);
} else {
LOGGER.debug("Received exception from the source.", t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1879,11 +1879,10 @@ protected final void subscribeInternal(Subscriber<? super T> subscriber) {
}

/**
* Subscribe to this {@link Single}, emits the result to the passed {@link Consumer} and log any
* {@link Subscriber#onError(Throwable)}.
*
* @param resultConsumer {@link Consumer} to accept the result of this {@link Single}.
* Subscribe to this {@link Single}, emit the result to the passed {@link Consumer} and log any
* {@link Subscriber#onError(Throwable)} at debug level.
*
* @param resultConsumer {@link Consumer} to accept the result of this {@link Single} if it succeeds.
* @return {@link Cancellable} used to invoke {@link Cancellable#cancel()} on the parameter of
* {@link Subscriber#onSubscribe(Cancellable)} for this {@link Single}.
*/
Expand All @@ -1893,6 +1892,21 @@ public final Cancellable subscribe(Consumer<? super T> resultConsumer) {
return subscriber;
}

/**
* Subscribe to this {@link Single}, emit the result or error to one of the passed {@link Consumer}s.
*
* @param resultConsumer {@link Consumer} to accept the result of this {@link Single} if it succeeds.
* @param errorConsumer {@link Consumer} to accept the error of this {@link Single} if it fails.
* @return {@link Cancellable} used to invoke {@link Cancellable#cancel()} on the parameter of
* {@link Subscriber#onSubscribe(Cancellable)} for this {@link Single}.
*/
public final Cancellable subscribe(Consumer<? super T> resultConsumer,
Consumer<? super Throwable> errorConsumer) {
SimpleSingleSubscriber<T> subscriber = new SimpleSingleSubscriber<>(resultConsumer, errorConsumer);
subscribeInternal(subscriber);
return subscriber;
}

/**
* Handles a subscriber to this {@link Single}.
*
Expand Down

0 comments on commit 7a05d7d

Please sign in to comment.