Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single/Completable subscribe overloads to handle failures #3112

Merged
merged 2 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,21 @@ public final Cancellable subscribe(Runnable onComplete) {
return subscriber;
}

/**
* Subscribe to this {@link Completable}, invoke passed {@link Runnable} when this {@link Completable} terminates
* successfully or emit an error to {@link Consumer} when it fails.
*
* @param onComplete {@link Runnable} to invoke when this {@link Completable} terminates successfully.
* @param errorConsumer {@link Consumer} to accept the error when this {@link Completable} fails.
* @return {@link Cancellable} used to invoke {@link Cancellable#cancel()} on the parameter of
* {@link Subscriber#onSubscribe(Cancellable)} for this {@link Completable}.
*/
public final Cancellable subscribe(Runnable onComplete, Consumer<? super Throwable> errorConsumer) {
SimpleCompletableSubscriber subscriber = new SimpleCompletableSubscriber(onComplete, errorConsumer);
subscribeInternal(subscriber);
return subscriber;
}

/**
* Handles a subscriber to this {@code Completable}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,31 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.Consumer;
import javax.annotation.Nullable;

import static java.util.Objects.requireNonNull;

final class SimpleCompletableSubscriber extends SequentialCancellable implements CompletableSource.Subscriber {

private static final Runnable NOOP_RUNNABLE = () -> { };
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleCompletableSubscriber.class);
private static final Runnable NOOP_RUNNABLE = () -> { };
private final Runnable onComplete;
@Nullable
private final Consumer<? super Throwable> errorConsumer;

SimpleCompletableSubscriber() {
this(NOOP_RUNNABLE);
}

SimpleCompletableSubscriber(final Runnable onComplete) {
this(onComplete, null);
}

SimpleCompletableSubscriber(final Runnable onComplete,
@Nullable final Consumer<? super Throwable> errorConsumer) {
this.onComplete = requireNonNull(onComplete);
this.errorConsumer = errorConsumer;
}

@Override
Expand All @@ -54,6 +65,10 @@ public void onComplete() {

@Override
public void onError(Throwable t) {
LOGGER.debug("Received exception from the source.", t);
if (errorConsumer != null) {
errorConsumer.accept(t);
} else {
LOGGER.debug("Received exception from the source.", t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,17 @@ final class SimpleSingleSubscriber<T> extends SequentialCancellable implements S
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleSingleSubscriber.class);

private final Consumer<? super T> resultConsumer;
@Nullable
private final Consumer<? super Throwable> errorConsumer;

SimpleSingleSubscriber(Consumer<? super T> resultConsumer) {
SimpleSingleSubscriber(final Consumer<? super T> resultConsumer) {
this(resultConsumer, null);
}

SimpleSingleSubscriber(final Consumer<? super T> resultConsumer,
@Nullable final Consumer<? super Throwable> errorConsumer) {
this.resultConsumer = requireNonNull(resultConsumer);
this.errorConsumer = errorConsumer;
}

@Override
Expand All @@ -44,11 +52,19 @@ public void onSubscribe(Cancellable cancellable) {

@Override
public void onSuccess(@Nullable T result) {
resultConsumer.accept(result);
try {
resultConsumer.accept(result);
} catch (Throwable t) {
LOGGER.debug("Received exception from the result consumer {}.", resultConsumer, t);
}
}

@Override
public void onError(Throwable t) {
LOGGER.debug("Received exception from the source.", t);
if (errorConsumer != null) {
errorConsumer.accept(t);
} else {
LOGGER.debug("Received exception from the source.", t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1879,11 +1879,10 @@ protected final void subscribeInternal(Subscriber<? super T> subscriber) {
}

/**
* Subscribe to this {@link Single}, emits the result to the passed {@link Consumer} and log any
* {@link Subscriber#onError(Throwable)}.
*
* @param resultConsumer {@link Consumer} to accept the result of this {@link Single}.
* Subscribe to this {@link Single}, emit the result to the passed {@link Consumer} and log any
* {@link Subscriber#onError(Throwable)} at debug level.
*
* @param resultConsumer {@link Consumer} to accept the result of this {@link Single} if it succeeds.
* @return {@link Cancellable} used to invoke {@link Cancellable#cancel()} on the parameter of
* {@link Subscriber#onSubscribe(Cancellable)} for this {@link Single}.
*/
Expand All @@ -1893,6 +1892,21 @@ public final Cancellable subscribe(Consumer<? super T> resultConsumer) {
return subscriber;
}

/**
* Subscribe to this {@link Single}, emit the result or error to one of the passed {@link Consumer}s.
*
* @param resultConsumer {@link Consumer} to accept the result of this {@link Single} if it succeeds.
* @param errorConsumer {@link Consumer} to accept the error of this {@link Single} if it fails.
* @return {@link Cancellable} used to invoke {@link Cancellable#cancel()} on the parameter of
* {@link Subscriber#onSubscribe(Cancellable)} for this {@link Single}.
*/
public final Cancellable subscribe(Consumer<? super T> resultConsumer,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 consumers vs bi-consumer: I picked the way that exists in both rxjava (for Single and Completable) and reactor (for Mono) to be as close as possible to other implementations. We can add BiConsumer later if there is an ask.

Consumer<? super Throwable> errorConsumer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to enforce that this helper doesn't receive a null errorConsumer? If not, we might as well funnel the subscribe(Consumer<? super T>) variant through this method.
The same question applies to Completable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, changed ctors to use requireNonNull when we expect errorConsumer

SimpleSingleSubscriber<T> subscriber = new SimpleSingleSubscriber<>(resultConsumer, errorConsumer);
subscribeInternal(subscriber);
return subscriber;
}

/**
* Handles a subscriber to this {@link Single}.
*
Expand Down
Loading