From 34242e14b6c6d5b31c2dfce17e65410dd2f20d8a Mon Sep 17 00:00:00 2001 From: Niklas Baudy Date: Fri, 8 Dec 2017 00:02:47 +0100 Subject: [PATCH] 2.x: Add retry(times, predicate) to Single & Completable and verify behavior across them and Maybe. (#5753) * 2.x: Add retry(times, predicate) to Single & Completable and verify behavior across them and Maybe. * Adjust ParamValidationCheckerTest for the 2 newly added methods. * Add AtomicInteger for checking the number of subscribe calls. * Fix copy pasta mistake. --- src/main/java/io/reactivex/Completable.java | 22 ++++ src/main/java/io/reactivex/Single.java | 20 +++ .../reactivex/ParamValidationCheckerTest.java | 2 + .../completable/CompletableRetryTest.java | 115 +++++++++++++++++ .../io/reactivex/maybe/MaybeRetryTest.java | 121 ++++++++++++++++++ .../io/reactivex/single/SingleRetryTest.java | 121 ++++++++++++++++++ 6 files changed, 401 insertions(+) create mode 100644 src/test/java/io/reactivex/completable/CompletableRetryTest.java create mode 100644 src/test/java/io/reactivex/maybe/MaybeRetryTest.java create mode 100644 src/test/java/io/reactivex/single/SingleRetryTest.java diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 34008f1a0c..172e3caab6 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -1545,6 +1545,28 @@ public final Completable retry(long times) { return fromPublisher(toFlowable().retry(times)); } + /** + * Returns a Completable that when this Completable emits an error, retries at most times + * or until the predicate returns false, whichever happens first and emitting the last error. + *
+ *
Scheduler:
+ *
{@code retry} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param times the number of times the returned Completable should retry this Completable + * @param predicate the predicate that is called with the latest throwable and should return + * true to indicate the returned Completable should resubscribe to this Completable. + * @return the new Completable instance + * @throws NullPointerException if predicate is null + * @throws IllegalArgumentException if times is negative + * @since 2.1.8 - experimental + */ + @Experimental + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public final Completable retry(long times, Predicate predicate) { + return fromPublisher(toFlowable().retry(times, predicate)); + } + /** * Returns a Completable that when this Completable emits an error, calls the given predicate with * the latest exception to decide whether to resubscribe to this or not. diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 72ee2872c6..df59da1ff5 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -2649,6 +2649,26 @@ public final Single retry(BiPredicate pre return toSingle(toFlowable().retry(predicate)); } + /** + * Repeatedly re-subscribe at most times or until the predicate returns false, whichever happens first + * if it fails with an onError. + *
+ *
Scheduler:
+ *
{@code retry} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param times the number of times to resubscribe if the current Single fails + * @param predicate the predicate called with the failure Throwable + * and should return true if a resubscription should happen + * @return the new Single instance + * @since 2.1.8 - experimental + */ + @Experimental + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public final Single retry(long times, Predicate predicate) { + return toSingle(toFlowable().retry(times, predicate)); + } + /** * Re-subscribe to the current Single if the given predicate returns true when the Single fails * with an onError. diff --git a/src/test/java/io/reactivex/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/ParamValidationCheckerTest.java index 5ad5afbd96..d129d14aa4 100644 --- a/src/test/java/io/reactivex/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/ParamValidationCheckerTest.java @@ -258,6 +258,7 @@ public void checkParallelFlowable() { // zero retry is allowed addOverride(new ParamOverride(Completable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE)); + addOverride(new ParamOverride(Completable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE, Predicate.class)); // negative time is considered as zero time addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "blockingGet", Long.TYPE, TimeUnit.class)); @@ -323,6 +324,7 @@ public void checkParallelFlowable() { // zero retry is allowed addOverride(new ParamOverride(Single.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE)); + addOverride(new ParamOverride(Single.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE, Predicate.class)); // negative time is considered as zero time addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "delaySubscription", Long.TYPE, TimeUnit.class)); diff --git a/src/test/java/io/reactivex/completable/CompletableRetryTest.java b/src/test/java/io/reactivex/completable/CompletableRetryTest.java new file mode 100644 index 0000000000..23619078bd --- /dev/null +++ b/src/test/java/io/reactivex/completable/CompletableRetryTest.java @@ -0,0 +1,115 @@ +/** + * Copyright (c) 2017-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.completable; + +import io.reactivex.Completable; +import io.reactivex.functions.Action; +import io.reactivex.functions.Predicate; +import io.reactivex.internal.functions.Functions; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class CompletableRetryTest { + @Test + public void retryTimesPredicateWithMatchingPredicate() { + final AtomicInteger atomicInteger = new AtomicInteger(3); + final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0); + + Completable.fromAction(new Action() { + @Override public void run() throws Exception { + numberOfSubscribeCalls.incrementAndGet(); + + if (atomicInteger.decrementAndGet() != 0) { + throw new RuntimeException(); + } + + throw new IllegalArgumentException(); + } + }) + .retry(Integer.MAX_VALUE, new Predicate() { + @Override public boolean test(final Throwable throwable) throws Exception { + return !(throwable instanceof IllegalArgumentException); + } + }) + .test() + .assertFailure(IllegalArgumentException.class); + + assertEquals(3, numberOfSubscribeCalls.get()); + } + + @Test + public void retryTimesPredicateWithMatchingRetryAmount() { + final AtomicInteger atomicInteger = new AtomicInteger(3); + final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0); + + Completable.fromAction(new Action() { + @Override public void run() throws Exception { + numberOfSubscribeCalls.incrementAndGet(); + + if (atomicInteger.decrementAndGet() != 0) { + throw new RuntimeException(); + } + } + }) + .retry(2, Functions.alwaysTrue()) + .test() + .assertResult(); + + assertEquals(3, numberOfSubscribeCalls.get()); + } + + @Test + public void retryTimesPredicateWithNotMatchingRetryAmount() { + final AtomicInteger atomicInteger = new AtomicInteger(3); + final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0); + + Completable.fromAction(new Action() { + @Override public void run() throws Exception { + numberOfSubscribeCalls.incrementAndGet(); + + if (atomicInteger.decrementAndGet() != 0) { + throw new RuntimeException(); + } + } + }) + .retry(1, Functions.alwaysTrue()) + .test() + .assertFailure(RuntimeException.class); + + assertEquals(2, numberOfSubscribeCalls.get()); + } + + @Test + public void retryTimesPredicateWithZeroRetries() { + final AtomicInteger atomicInteger = new AtomicInteger(2); + final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0); + + Completable.fromAction(new Action() { + @Override public void run() throws Exception { + numberOfSubscribeCalls.incrementAndGet(); + + if (atomicInteger.decrementAndGet() != 0) { + throw new RuntimeException(); + } + } + }) + .retry(0, Functions.alwaysTrue()) + .test() + .assertFailure(RuntimeException.class); + + assertEquals(1, numberOfSubscribeCalls.get()); + } +} diff --git a/src/test/java/io/reactivex/maybe/MaybeRetryTest.java b/src/test/java/io/reactivex/maybe/MaybeRetryTest.java new file mode 100644 index 0000000000..0749da742d --- /dev/null +++ b/src/test/java/io/reactivex/maybe/MaybeRetryTest.java @@ -0,0 +1,121 @@ +/** + * Copyright (c) 2017-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.maybe; + +import io.reactivex.Maybe; +import io.reactivex.functions.Predicate; +import io.reactivex.internal.functions.Functions; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class MaybeRetryTest { + @Test + public void retryTimesPredicateWithMatchingPredicate() { + final AtomicInteger atomicInteger = new AtomicInteger(3); + final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0); + + Maybe.fromCallable(new Callable() { + @Override public Boolean call() throws Exception { + numberOfSubscribeCalls.incrementAndGet(); + + if (atomicInteger.decrementAndGet() != 0) { + throw new RuntimeException(); + } + + throw new IllegalArgumentException(); + } + }) + .retry(Integer.MAX_VALUE, new Predicate() { + @Override public boolean test(final Throwable throwable) throws Exception { + return !(throwable instanceof IllegalArgumentException); + } + }) + .test() + .assertFailure(IllegalArgumentException.class); + + assertEquals(3, numberOfSubscribeCalls.get()); + } + + @Test + public void retryTimesPredicateWithMatchingRetryAmount() { + final AtomicInteger atomicInteger = new AtomicInteger(3); + final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0); + + Maybe.fromCallable(new Callable() { + @Override public Boolean call() throws Exception { + numberOfSubscribeCalls.incrementAndGet(); + + if (atomicInteger.decrementAndGet() != 0) { + throw new RuntimeException(); + } + + return true; + } + }) + .retry(2, Functions.alwaysTrue()) + .test() + .assertResult(true); + + assertEquals(3, numberOfSubscribeCalls.get()); + } + + @Test + public void retryTimesPredicateWithNotMatchingRetryAmount() { + final AtomicInteger atomicInteger = new AtomicInteger(3); + final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0); + + Maybe.fromCallable(new Callable() { + @Override public Boolean call() throws Exception { + numberOfSubscribeCalls.incrementAndGet(); + + if (atomicInteger.decrementAndGet() != 0) { + throw new RuntimeException(); + } + + return true; + } + }) + .retry(1, Functions.alwaysTrue()) + .test() + .assertFailure(RuntimeException.class); + + assertEquals(2, numberOfSubscribeCalls.get()); + } + + @Test + public void retryTimesPredicateWithZeroRetries() { + final AtomicInteger atomicInteger = new AtomicInteger(2); + final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0); + + Maybe.fromCallable(new Callable() { + @Override public Boolean call() throws Exception { + numberOfSubscribeCalls.incrementAndGet(); + + if (atomicInteger.decrementAndGet() != 0) { + throw new RuntimeException(); + } + + return true; + } + }) + .retry(0, Functions.alwaysTrue()) + .test() + .assertFailure(RuntimeException.class); + + assertEquals(1, numberOfSubscribeCalls.get()); + } +} diff --git a/src/test/java/io/reactivex/single/SingleRetryTest.java b/src/test/java/io/reactivex/single/SingleRetryTest.java new file mode 100644 index 0000000000..c37f877f03 --- /dev/null +++ b/src/test/java/io/reactivex/single/SingleRetryTest.java @@ -0,0 +1,121 @@ +/** + * Copyright (c) 2017-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.single; + +import io.reactivex.Single; +import io.reactivex.functions.Predicate; +import io.reactivex.internal.functions.Functions; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class SingleRetryTest { + @Test + public void retryTimesPredicateWithMatchingPredicate() { + final AtomicInteger atomicInteger = new AtomicInteger(3); + final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0); + + Single.fromCallable(new Callable() { + @Override public Boolean call() throws Exception { + numberOfSubscribeCalls.incrementAndGet(); + + if (atomicInteger.decrementAndGet() != 0) { + throw new RuntimeException(); + } + + throw new IllegalArgumentException(); + } + }) + .retry(Integer.MAX_VALUE, new Predicate() { + @Override public boolean test(final Throwable throwable) throws Exception { + return !(throwable instanceof IllegalArgumentException); + } + }) + .test() + .assertFailure(IllegalArgumentException.class); + + assertEquals(3, numberOfSubscribeCalls.get()); + } + + @Test + public void retryTimesPredicateWithMatchingRetryAmount() { + final AtomicInteger atomicInteger = new AtomicInteger(3); + final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0); + + Single.fromCallable(new Callable() { + @Override public Boolean call() throws Exception { + numberOfSubscribeCalls.incrementAndGet(); + + if (atomicInteger.decrementAndGet() != 0) { + throw new RuntimeException(); + } + + return true; + } + }) + .retry(2, Functions.alwaysTrue()) + .test() + .assertResult(true); + + assertEquals(3, numberOfSubscribeCalls.get()); + } + + @Test + public void retryTimesPredicateWithNotMatchingRetryAmount() { + final AtomicInteger atomicInteger = new AtomicInteger(3); + final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0); + + Single.fromCallable(new Callable() { + @Override public Boolean call() throws Exception { + numberOfSubscribeCalls.incrementAndGet(); + + if (atomicInteger.decrementAndGet() != 0) { + throw new RuntimeException(); + } + + return true; + } + }) + .retry(1, Functions.alwaysTrue()) + .test() + .assertFailure(RuntimeException.class); + + assertEquals(2, numberOfSubscribeCalls.get()); + } + + @Test + public void retryTimesPredicateWithZeroRetries() { + final AtomicInteger atomicInteger = new AtomicInteger(2); + final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0); + + Single.fromCallable(new Callable() { + @Override public Boolean call() throws Exception { + numberOfSubscribeCalls.incrementAndGet(); + + if (atomicInteger.decrementAndGet() != 0) { + throw new RuntimeException(); + } + + return true; + } + }) + .retry(0, Functions.alwaysTrue()) + .test() + .assertFailure(RuntimeException.class); + + assertEquals(1, numberOfSubscribeCalls.get()); + } +}