Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publisher, Single, Completable onError* operators #1435

Merged
merged 8 commits into from
Mar 18, 2021
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.function.Function;
import java.util.function.Supplier;

import static io.servicetalk.concurrent.api.Completable.failed;
import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;
import static java.util.function.Function.identity;

Expand All @@ -46,9 +45,8 @@ private AsyncCloseables() {
* @return A {@link Completable} that is notified once the close is complete.
*/
public static Completable closeAsyncGracefully(AsyncCloseable closable, long timeout, TimeUnit timeoutUnit) {
return closable.closeAsyncGracefully().timeout(timeout, timeoutUnit).onErrorResume(
t -> t instanceof TimeoutException ? closable.closeAsync() : failed(t)
);
return closable.closeAsyncGracefully().timeout(timeout, timeoutUnit).onErrorResume(TimeoutException.class,
t -> closable.closeAsync());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.IntPredicate;
import java.util.function.Predicate;
import java.util.function.Supplier;

import static io.servicetalk.concurrent.api.CompletableDoOnUtils.doOnCompleteSupplier;
Expand Down Expand Up @@ -103,6 +104,147 @@ protected Completable() {
// Operators Begin
//

/**
* Transform errors emitted on this {@link Completable} into a {@link Subscriber#onComplete()} signal
* (e.g. swallows the error).
* <p>
* This method provides a data transformation in sequential programming similar to:
* <pre>{@code
* try {
* resultOfThisCompletable();
* } catch (Throwable cause) {
* // ignored
* }
* }</pre>
* @return A {@link Completable} which transform errors emitted on this {@link Completable} into a
* {@link Subscriber#onComplete()} signal (e.g. swallows the error).
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX catch operator.</a>
*/
public final Completable onErrorComplete() {
return onErrorComplete(t -> true);
}

/**
* Transform errors emitted on this {@link Completable} which match {@code type} into a
* {@link Subscriber#onComplete()} signal (e.g. swallows the error).
* <p>
* This method provides a data transformation in sequential programming similar to:
* <pre>{@code
* try {
* resultOfThisCompletable();
* } catch (Throwable cause) {
* if (!type.isInstance(cause)) {
* throw cause;
* }
* }
* }</pre>
* @param type The {@link Throwable} type to filter, operator will not apply for errors which don't match this type.
* @param <E> The {@link Throwable} type.
* @return A {@link Completable} which transform errors emitted on this {@link Completable} which match {@code type}
* into a {@link Subscriber#onComplete()} signal (e.g. swallows the error).
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX catch operator.</a>
*/
public final <E extends Throwable> Completable onErrorComplete(Class<E> type) {
return onErrorComplete(type::isInstance);
}

/**
* Transform errors emitted on this {@link Completable} which match {@code predicate} into a
* {@link Subscriber#onComplete()} signal (e.g. swallows the error).
* <p>
* This method provides a data transformation in sequential programming similar to:
* <pre>{@code
* try {
* resultOfThisCompletable();
* } catch (Throwable cause) {
* if (!predicate.test(cause)) {
* throw cause;
* }
* }
* }</pre>
* @param predicate returns {@code true} if the {@link Throwable} should be transformed to and
* {@link Subscriber#onComplete()} signal. Returns {@code false} to propagate the error.
* @return A {@link Completable} which transform errors emitted on this {@link Completable} which match
* {@code predicate} into a {@link Subscriber#onComplete()} signal (e.g. swallows the error).
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX catch operator.</a>
*/
public final Completable onErrorComplete(Predicate<? super Throwable> predicate) {
return new OnErrorCompleteCompletable(this, predicate, executor);
}

/**
* Transform errors emitted on this {@link Completable} into a different error.
* <p>
* This method provides a data transformation in sequential programming similar to:
* <pre>{@code
* try {
* resultOfThisCompletable();
* } catch (Throwable cause) {
* throw mapper.apply(cause);
* }
* }</pre>
* @param mapper returns the error used to terminate the returned {@link Completable}.
* @return A {@link Completable} which transform errors emitted on this {@link Completable} into a different error.
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX catch operator.</a>
*/
public final Completable onErrorMap(Function<? super Throwable, ? extends Throwable> mapper) {
return onErrorMap(t -> true, mapper);
}

/**
* Transform errors emitted on this {@link Completable} which match {@code type} into a different error.
* <p>
* This method provides a data transformation in sequential programming similar to:
* <pre>{@code
* try {
* resultOfThisCompletable();
* } catch (Throwable cause) {
* if (type.isInstance(cause)) {
* throw mapper.apply(cause);
* } else {
* throw cause;
* }
* }
* }</pre>
* @param type The {@link Throwable} type to filter, operator will not apply for errors which don't match this type.
* @param mapper returns the error used to terminate the returned {@link Completable}.
* @param <E> The type of {@link Throwable} to transform.
* @return A {@link Completable} which transform errors emitted on this {@link Completable} into a different error.
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX catch operator.</a>
*/
public final <E extends Throwable> Completable onErrorMap(
Class<E> type, Function<? super E, ? extends Throwable> mapper) {
@SuppressWarnings("unchecked")
final Function<Throwable, Throwable> rawMapper = (Function<Throwable, Throwable>) mapper;
return onErrorMap(type::isInstance, rawMapper);
}

/**
* Transform errors emitted on this {@link Completable} which match {@code predicate} into a different error.
* <p>
* This method provides a data transformation in sequential programming similar to:
* <pre>{@code
* try {
* resultOfThisCompletable();
* } catch (Throwable cause) {
* if (predicate.test(cause)) {
* throw mapper.apply(cause);
* } else {
* throw cause;
* }
* }
* }</pre>
* @param predicate returns {@code true} if the {@link Throwable} should be transformed via {@code mapper}. Returns
* {@code false} to propagate the original error.
* @param mapper returns the error used to terminate the returned {@link Completable}.
* @return A {@link Completable} which transform errors emitted on this {@link Completable} into a different error.
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX catch operator.</a>
*/
public final Completable onErrorMap(Predicate<? super Throwable> predicate,
Function<? super Throwable, ? extends Throwable> mapper) {
return new OnErrorMapCompletable(this, predicate, mapper, executor);
}

/**
* Recover from any error emitted by this {@link Completable} by using another {@link Completable} provided by the
* passed {@code nextFactory}.
Expand All @@ -118,11 +260,74 @@ protected Completable() {
* }</pre>
*
* @param nextFactory Returns the next {@link Completable}, if this {@link Completable} emits an error.
* @return A {@link Completable} that recovers from an error from this {@code Completable} by using another
* @return A {@link Completable} that recovers from an error from this {@link Completable} by using another
* {@link Completable} provided by the passed {@code nextFactory}.
*/
public final Completable onErrorResume(Function<? super Throwable, ? extends Completable> nextFactory) {
return onErrorResume(t -> true, nextFactory);
}

/**
* Recover from errors emitted by this {@link Completable} which match {@code type} by using another
* {@link Completable} provided by the passed {@code nextFactory}.
* <p>
* This method provides similar capabilities to a try/catch block in sequential programming:
* <pre>{@code
* try {
* resultOfThisCompletable();
* } catch (Throwable cause) {
* if (type.isInstance(cause)) {
* // Note nextFactory returning a error Completable is like re-throwing (nextFactory shouldn't throw).
* results = nextFactory.apply(cause);
* } else {
* throw cause;
* }
* }
* }</pre>
*
* @param type The {@link Throwable} type to filter, operator will not apply for errors which don't match this type.
* @param nextFactory Returns the next {@link Completable}, when this {@link Completable} emits an error.
* @param <E> The type of {@link Throwable} to transform.
* @return A {@link Completable} that recovers from an error from this {@code Publisher} by using another
* {@link Completable} provided by the passed {@code nextFactory}.
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX catch operator.</a>
*/
public final <E extends Throwable> Completable onErrorResume(
Class<E> type, Function<? super E, ? extends Completable> nextFactory) {
@SuppressWarnings("unchecked")
Function<Throwable, ? extends Completable> rawNextFactory =
(Function<Throwable, ? extends Completable>) nextFactory;
return onErrorResume(type::isInstance, rawNextFactory);
}

/**
* Recover from errors emitted by this {@link Completable} which match {@code predicate} by using another
* {@link Completable} provided by the passed {@code nextFactory}.
* <p>
* This method provides similar capabilities to a try/catch block in sequential programming:
* <pre>{@code
* try {
* resultOfThisCompletable();
* } catch (Throwable cause) {
* if (predicate.test(cause)) {
* // Note that nextFactory returning a error Publisher is like re-throwing (nextFactory shouldn't throw).
* results = nextFactory.apply(cause);
* } else {
* throw cause;
* }
* }
* }</pre>
*
* @param predicate returns {@code true} if the {@link Throwable} should be transformed via {@code nextFactory}.
* Returns {@code false} to propagate the original error.
* @param nextFactory Returns the next {@link Completable}, when this {@link Completable} emits an error.
* @return A {@link Completable} that recovers from an error from this {@link Completable} by using another
* {@link Completable} provided by the passed {@code nextFactory}.
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX catch operator.</a>
*/
public final Completable onErrorResume(Function<Throwable, ? extends Completable> nextFactory) {
return new ResumeCompletable(this, nextFactory, executor);
public final Completable onErrorResume(Predicate<? super Throwable> predicate,
Function<? super Throwable, ? extends Completable> nextFactory) {
return new OnErrorResumeCompletable(this, predicate, nextFactory, executor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,28 +123,28 @@ private void mergeCloseableDelayError(final List<AsyncCloseable> closeables) {
}

private void concatCloseableDelayError(final AsyncCloseable closeable) {
closeAsync = closeAsync.concat(closeable.closeAsync().onErrorResume(th -> {
closeAsync = closeAsync.concat(closeable.closeAsync().onErrorComplete(th -> {
//TODO: This should use concatDelayError when available.
LOGGER.debug("Ignored failure to close {}.", closeable, th);
return completed();
return true;
}));
closeAsyncGracefully = closeAsyncGracefully.concat(closeable.closeAsyncGracefully().onErrorResume(th -> {
closeAsyncGracefully = closeAsyncGracefully.concat(closeable.closeAsyncGracefully().onErrorComplete(th -> {
//TODO: This should use concatDelayError when available.
LOGGER.debug("Ignored failure to close {}.", closeable, th);
return completed();
return true;
}));
}

private void prependCloseableDelayError(final AsyncCloseable closeable) {
closeAsync = closeable.closeAsync().onErrorResume(th -> {
closeAsync = closeable.closeAsync().onErrorComplete(th -> {
//TODO: This should use prependDelayError when available.
LOGGER.debug("Ignored failure to close {}.", closeable, th);
return completed();
return true;
}).concat(closeAsync);
closeAsyncGracefully = closeable.closeAsyncGracefully().onErrorResume(th -> {
closeAsyncGracefully = closeable.closeAsyncGracefully().onErrorComplete(th -> {
//TODO: This should use prependDelayError when available.
LOGGER.debug("Ignored failure to close {}.", closeable, th);
return completed();
return true;
}).concat(closeAsyncGracefully);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;

import java.util.function.Predicate;

import static java.util.Objects.requireNonNull;

final class OnErrorCompleteCompletable extends AbstractSynchronousCompletableOperator {
private final Predicate<? super Throwable> predicate;

OnErrorCompleteCompletable(final Completable original, Predicate<? super Throwable> predicate,
final Executor executor) {
super(original, executor);
this.predicate = requireNonNull(predicate);
}

@Override
public Subscriber apply(final Subscriber subscriber) {
return new OnErrorCompleteSubscriber(subscriber, predicate);
}

private static final class OnErrorCompleteSubscriber implements Subscriber {
private final Subscriber subscriber;
private final Predicate<? super Throwable> predicate;

private OnErrorCompleteSubscriber(final Subscriber subscriber, final Predicate<? super Throwable> predicate) {
this.subscriber = subscriber;
this.predicate = predicate;
}

@Override
public void onSubscribe(final Cancellable cancellable) {
subscriber.onSubscribe(cancellable);
}

@Override
public void onComplete() {
subscriber.onComplete();
}

@Override
public void onError(final Throwable t) {
final boolean predicateResult;
try {
predicateResult = predicate.test(t);
} catch (Throwable cause) {
subscriber.onError(cause);
return;
}

if (predicateResult) {
subscriber.onComplete();
} else {
subscriber.onError(t);
}
}
}
}
Loading