Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry with Predicate #1214

Closed
benjchristensen opened this issue May 19, 2014 · 2 comments
Closed

Retry with Predicate #1214

benjchristensen opened this issue May 19, 2014 · 2 comments
Milestone

Comments

@benjchristensen
Copy link
Member

Several people have expressed need for conditional retry behavior. I suggest a new retry overload that simply takes a predicate that receives the Throwable.

public final Observable<T> retry(Func1<Throwable, Boolean> predicate)

This would allow deciding whether to retry based on the Throwable and/or count which can be maintained by the Func1.

Or we could use a Func2 and pass in the count:

public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate)
@benjchristensen
Copy link
Member Author

Here is the type of thing that has to be done for conditional retries without this overload:

import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable;
import rx.Subscriber;

public class ConditionalRetry {

    public static void main(String[] args) {

        final AtomicInteger c = new AtomicInteger();
        Observable<String> oWithRuntimeException = Observable.create((Subscriber<? super String> s) -> {
            System.out.println("Execution: " + c.get());
            if (c.incrementAndGet() < 3) {
                s.onError(new RuntimeException("retryable"));
            } else {
                s.onNext("hello");
                s.onCompleted();
            }
        });

        final AtomicInteger c2 = new AtomicInteger();
        Observable<String> oWithIllegalStateException = Observable.create((Subscriber<? super String> s) -> {
            System.out.println("Execution: " + c2.get());
            if (c2.incrementAndGet() < 3) {
                s.onError(new RuntimeException("retryable"));
            } else {
                s.onError(new IllegalStateException());
            }
        });

        subscribe(oWithRuntimeException);
        subscribe(oWithIllegalStateException);
    }

    public static void subscribe(Observable<String> o) {
        o = o.materialize().flatMap(n -> {
            if (n.isOnError()) {
                if (n.getThrowable() instanceof IllegalStateException) {
                    return Observable.just(n);
                } else {
                    return Observable.error(n.getThrowable());
                }
            } else {
                return Observable.just(n);
            }
        }).retry().dematerialize();

        o.subscribe(System.out::println, t -> t.printStackTrace());
    }
}

@benjchristensen
Copy link
Member Author

Completed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant