diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java
index 34008f1a0c..172e3caab6 100644
--- a/src/main/java/io/reactivex/Completable.java
+++ b/src/main/java/io/reactivex/Completable.java
@@ -1545,6 +1545,28 @@ public final Completable retry(long times) {
return fromPublisher(toFlowable().retry(times));
}
+ /**
+ * Returns a Completable that when this Completable emits an error, retries at most times
+ * or until the predicate returns false, whichever happens first and emitting the last error.
+ *
+ * - Scheduler:
+ * - {@code retry} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param times the number of times the returned Completable should retry this Completable
+ * @param predicate the predicate that is called with the latest throwable and should return
+ * true to indicate the returned Completable should resubscribe to this Completable.
+ * @return the new Completable instance
+ * @throws NullPointerException if predicate is null
+ * @throws IllegalArgumentException if times is negative
+ * @since 2.1.8 - experimental
+ */
+ @Experimental
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final Completable retry(long times, Predicate super Throwable> predicate) {
+ return fromPublisher(toFlowable().retry(times, predicate));
+ }
+
/**
* Returns a Completable that when this Completable emits an error, calls the given predicate with
* the latest exception to decide whether to resubscribe to this or not.
diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java
index 72ee2872c6..df59da1ff5 100644
--- a/src/main/java/io/reactivex/Single.java
+++ b/src/main/java/io/reactivex/Single.java
@@ -2649,6 +2649,26 @@ public final Single retry(BiPredicate super Integer, ? super Throwable> pre
return toSingle(toFlowable().retry(predicate));
}
+ /**
+ * Repeatedly re-subscribe at most times or until the predicate returns false, whichever happens first
+ * if it fails with an onError.
+ *
+ * - Scheduler:
+ * - {@code retry} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param times the number of times to resubscribe if the current Single fails
+ * @param predicate the predicate called with the failure Throwable
+ * and should return true if a resubscription should happen
+ * @return the new Single instance
+ * @since 2.1.8 - experimental
+ */
+ @Experimental
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final Single retry(long times, Predicate super Throwable> predicate) {
+ return toSingle(toFlowable().retry(times, predicate));
+ }
+
/**
* Re-subscribe to the current Single if the given predicate returns true when the Single fails
* with an onError.
diff --git a/src/test/java/io/reactivex/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/ParamValidationCheckerTest.java
index 5ad5afbd96..d129d14aa4 100644
--- a/src/test/java/io/reactivex/ParamValidationCheckerTest.java
+++ b/src/test/java/io/reactivex/ParamValidationCheckerTest.java
@@ -258,6 +258,7 @@ public void checkParallelFlowable() {
// zero retry is allowed
addOverride(new ParamOverride(Completable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE));
+ addOverride(new ParamOverride(Completable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE, Predicate.class));
// negative time is considered as zero time
addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "blockingGet", Long.TYPE, TimeUnit.class));
@@ -323,6 +324,7 @@ public void checkParallelFlowable() {
// zero retry is allowed
addOverride(new ParamOverride(Single.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE));
+ addOverride(new ParamOverride(Single.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE, Predicate.class));
// negative time is considered as zero time
addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "delaySubscription", Long.TYPE, TimeUnit.class));
diff --git a/src/test/java/io/reactivex/completable/CompletableRetryTest.java b/src/test/java/io/reactivex/completable/CompletableRetryTest.java
new file mode 100644
index 0000000000..23619078bd
--- /dev/null
+++ b/src/test/java/io/reactivex/completable/CompletableRetryTest.java
@@ -0,0 +1,115 @@
+/**
+ * Copyright (c) 2017-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.completable;
+
+import io.reactivex.Completable;
+import io.reactivex.functions.Action;
+import io.reactivex.functions.Predicate;
+import io.reactivex.internal.functions.Functions;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CompletableRetryTest {
+ @Test
+ public void retryTimesPredicateWithMatchingPredicate() {
+ final AtomicInteger atomicInteger = new AtomicInteger(3);
+ final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
+
+ Completable.fromAction(new Action() {
+ @Override public void run() throws Exception {
+ numberOfSubscribeCalls.incrementAndGet();
+
+ if (atomicInteger.decrementAndGet() != 0) {
+ throw new RuntimeException();
+ }
+
+ throw new IllegalArgumentException();
+ }
+ })
+ .retry(Integer.MAX_VALUE, new Predicate() {
+ @Override public boolean test(final Throwable throwable) throws Exception {
+ return !(throwable instanceof IllegalArgumentException);
+ }
+ })
+ .test()
+ .assertFailure(IllegalArgumentException.class);
+
+ assertEquals(3, numberOfSubscribeCalls.get());
+ }
+
+ @Test
+ public void retryTimesPredicateWithMatchingRetryAmount() {
+ final AtomicInteger atomicInteger = new AtomicInteger(3);
+ final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
+
+ Completable.fromAction(new Action() {
+ @Override public void run() throws Exception {
+ numberOfSubscribeCalls.incrementAndGet();
+
+ if (atomicInteger.decrementAndGet() != 0) {
+ throw new RuntimeException();
+ }
+ }
+ })
+ .retry(2, Functions.alwaysTrue())
+ .test()
+ .assertResult();
+
+ assertEquals(3, numberOfSubscribeCalls.get());
+ }
+
+ @Test
+ public void retryTimesPredicateWithNotMatchingRetryAmount() {
+ final AtomicInteger atomicInteger = new AtomicInteger(3);
+ final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
+
+ Completable.fromAction(new Action() {
+ @Override public void run() throws Exception {
+ numberOfSubscribeCalls.incrementAndGet();
+
+ if (atomicInteger.decrementAndGet() != 0) {
+ throw new RuntimeException();
+ }
+ }
+ })
+ .retry(1, Functions.alwaysTrue())
+ .test()
+ .assertFailure(RuntimeException.class);
+
+ assertEquals(2, numberOfSubscribeCalls.get());
+ }
+
+ @Test
+ public void retryTimesPredicateWithZeroRetries() {
+ final AtomicInteger atomicInteger = new AtomicInteger(2);
+ final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
+
+ Completable.fromAction(new Action() {
+ @Override public void run() throws Exception {
+ numberOfSubscribeCalls.incrementAndGet();
+
+ if (atomicInteger.decrementAndGet() != 0) {
+ throw new RuntimeException();
+ }
+ }
+ })
+ .retry(0, Functions.alwaysTrue())
+ .test()
+ .assertFailure(RuntimeException.class);
+
+ assertEquals(1, numberOfSubscribeCalls.get());
+ }
+}
diff --git a/src/test/java/io/reactivex/maybe/MaybeRetryTest.java b/src/test/java/io/reactivex/maybe/MaybeRetryTest.java
new file mode 100644
index 0000000000..0749da742d
--- /dev/null
+++ b/src/test/java/io/reactivex/maybe/MaybeRetryTest.java
@@ -0,0 +1,121 @@
+/**
+ * Copyright (c) 2017-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.maybe;
+
+import io.reactivex.Maybe;
+import io.reactivex.functions.Predicate;
+import io.reactivex.internal.functions.Functions;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MaybeRetryTest {
+ @Test
+ public void retryTimesPredicateWithMatchingPredicate() {
+ final AtomicInteger atomicInteger = new AtomicInteger(3);
+ final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
+
+ Maybe.fromCallable(new Callable() {
+ @Override public Boolean call() throws Exception {
+ numberOfSubscribeCalls.incrementAndGet();
+
+ if (atomicInteger.decrementAndGet() != 0) {
+ throw new RuntimeException();
+ }
+
+ throw new IllegalArgumentException();
+ }
+ })
+ .retry(Integer.MAX_VALUE, new Predicate() {
+ @Override public boolean test(final Throwable throwable) throws Exception {
+ return !(throwable instanceof IllegalArgumentException);
+ }
+ })
+ .test()
+ .assertFailure(IllegalArgumentException.class);
+
+ assertEquals(3, numberOfSubscribeCalls.get());
+ }
+
+ @Test
+ public void retryTimesPredicateWithMatchingRetryAmount() {
+ final AtomicInteger atomicInteger = new AtomicInteger(3);
+ final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
+
+ Maybe.fromCallable(new Callable() {
+ @Override public Boolean call() throws Exception {
+ numberOfSubscribeCalls.incrementAndGet();
+
+ if (atomicInteger.decrementAndGet() != 0) {
+ throw new RuntimeException();
+ }
+
+ return true;
+ }
+ })
+ .retry(2, Functions.alwaysTrue())
+ .test()
+ .assertResult(true);
+
+ assertEquals(3, numberOfSubscribeCalls.get());
+ }
+
+ @Test
+ public void retryTimesPredicateWithNotMatchingRetryAmount() {
+ final AtomicInteger atomicInteger = new AtomicInteger(3);
+ final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
+
+ Maybe.fromCallable(new Callable() {
+ @Override public Boolean call() throws Exception {
+ numberOfSubscribeCalls.incrementAndGet();
+
+ if (atomicInteger.decrementAndGet() != 0) {
+ throw new RuntimeException();
+ }
+
+ return true;
+ }
+ })
+ .retry(1, Functions.alwaysTrue())
+ .test()
+ .assertFailure(RuntimeException.class);
+
+ assertEquals(2, numberOfSubscribeCalls.get());
+ }
+
+ @Test
+ public void retryTimesPredicateWithZeroRetries() {
+ final AtomicInteger atomicInteger = new AtomicInteger(2);
+ final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
+
+ Maybe.fromCallable(new Callable() {
+ @Override public Boolean call() throws Exception {
+ numberOfSubscribeCalls.incrementAndGet();
+
+ if (atomicInteger.decrementAndGet() != 0) {
+ throw new RuntimeException();
+ }
+
+ return true;
+ }
+ })
+ .retry(0, Functions.alwaysTrue())
+ .test()
+ .assertFailure(RuntimeException.class);
+
+ assertEquals(1, numberOfSubscribeCalls.get());
+ }
+}
diff --git a/src/test/java/io/reactivex/single/SingleRetryTest.java b/src/test/java/io/reactivex/single/SingleRetryTest.java
new file mode 100644
index 0000000000..c37f877f03
--- /dev/null
+++ b/src/test/java/io/reactivex/single/SingleRetryTest.java
@@ -0,0 +1,121 @@
+/**
+ * Copyright (c) 2017-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.single;
+
+import io.reactivex.Single;
+import io.reactivex.functions.Predicate;
+import io.reactivex.internal.functions.Functions;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class SingleRetryTest {
+ @Test
+ public void retryTimesPredicateWithMatchingPredicate() {
+ final AtomicInteger atomicInteger = new AtomicInteger(3);
+ final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
+
+ Single.fromCallable(new Callable() {
+ @Override public Boolean call() throws Exception {
+ numberOfSubscribeCalls.incrementAndGet();
+
+ if (atomicInteger.decrementAndGet() != 0) {
+ throw new RuntimeException();
+ }
+
+ throw new IllegalArgumentException();
+ }
+ })
+ .retry(Integer.MAX_VALUE, new Predicate() {
+ @Override public boolean test(final Throwable throwable) throws Exception {
+ return !(throwable instanceof IllegalArgumentException);
+ }
+ })
+ .test()
+ .assertFailure(IllegalArgumentException.class);
+
+ assertEquals(3, numberOfSubscribeCalls.get());
+ }
+
+ @Test
+ public void retryTimesPredicateWithMatchingRetryAmount() {
+ final AtomicInteger atomicInteger = new AtomicInteger(3);
+ final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
+
+ Single.fromCallable(new Callable() {
+ @Override public Boolean call() throws Exception {
+ numberOfSubscribeCalls.incrementAndGet();
+
+ if (atomicInteger.decrementAndGet() != 0) {
+ throw new RuntimeException();
+ }
+
+ return true;
+ }
+ })
+ .retry(2, Functions.alwaysTrue())
+ .test()
+ .assertResult(true);
+
+ assertEquals(3, numberOfSubscribeCalls.get());
+ }
+
+ @Test
+ public void retryTimesPredicateWithNotMatchingRetryAmount() {
+ final AtomicInteger atomicInteger = new AtomicInteger(3);
+ final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
+
+ Single.fromCallable(new Callable() {
+ @Override public Boolean call() throws Exception {
+ numberOfSubscribeCalls.incrementAndGet();
+
+ if (atomicInteger.decrementAndGet() != 0) {
+ throw new RuntimeException();
+ }
+
+ return true;
+ }
+ })
+ .retry(1, Functions.alwaysTrue())
+ .test()
+ .assertFailure(RuntimeException.class);
+
+ assertEquals(2, numberOfSubscribeCalls.get());
+ }
+
+ @Test
+ public void retryTimesPredicateWithZeroRetries() {
+ final AtomicInteger atomicInteger = new AtomicInteger(2);
+ final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
+
+ Single.fromCallable(new Callable() {
+ @Override public Boolean call() throws Exception {
+ numberOfSubscribeCalls.incrementAndGet();
+
+ if (atomicInteger.decrementAndGet() != 0) {
+ throw new RuntimeException();
+ }
+
+ return true;
+ }
+ })
+ .retry(0, Functions.alwaysTrue())
+ .test()
+ .assertFailure(RuntimeException.class);
+
+ assertEquals(1, numberOfSubscribeCalls.get());
+ }
+}