Skip to content

Commit

Permalink
2.x: Add retry(times, predicate) to Single & Completable and verify b…
Browse files Browse the repository at this point in the history
…ehavior across them and Maybe. (#5753)

* 2.x: Add retry(times, predicate) to Single & Completable and verify behavior across them and Maybe.

* Adjust ParamValidationCheckerTest for the 2 newly added methods.

* Add AtomicInteger for checking the number of subscribe calls.

* Fix copy pasta mistake.
  • Loading branch information
vanniktech authored and akarnokd committed Dec 7, 2017
1 parent ec40a5e commit 34242e1
Show file tree
Hide file tree
Showing 6 changed files with 401 additions and 0 deletions.
22 changes: 22 additions & 0 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1545,6 +1545,28 @@ public final Completable retry(long times) {
return fromPublisher(toFlowable().retry(times));
}

/**
* Returns a Completable that when this Completable emits an error, retries at most times
* or until the predicate returns false, whichever happens first and emitting the last error.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param times the number of times the returned Completable should retry this Completable
* @param predicate the predicate that is called with the latest throwable and should return
* true to indicate the returned Completable should resubscribe to this Completable.
* @return the new Completable instance
* @throws NullPointerException if predicate is null
* @throws IllegalArgumentException if times is negative
* @since 2.1.8 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable retry(long times, Predicate<? super Throwable> predicate) {
return fromPublisher(toFlowable().retry(times, predicate));
}

/**
* Returns a Completable that when this Completable emits an error, calls the given predicate with
* the latest exception to decide whether to resubscribe to this or not.
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -2649,6 +2649,26 @@ public final Single<T> retry(BiPredicate<? super Integer, ? super Throwable> pre
return toSingle(toFlowable().retry(predicate));
}

/**
* Repeatedly re-subscribe at most times or until the predicate returns false, whichever happens first
* if it fails with an onError.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param times the number of times to resubscribe if the current Single fails
* @param predicate the predicate called with the failure Throwable
* and should return true if a resubscription should happen
* @return the new Single instance
* @since 2.1.8 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> retry(long times, Predicate<? super Throwable> predicate) {
return toSingle(toFlowable().retry(times, predicate));
}

/**
* Re-subscribe to the current Single if the given predicate returns true when the Single fails
* with an onError.
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/io/reactivex/ParamValidationCheckerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public void checkParallelFlowable() {

// zero retry is allowed
addOverride(new ParamOverride(Completable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE));
addOverride(new ParamOverride(Completable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE, Predicate.class));

// negative time is considered as zero time
addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "blockingGet", Long.TYPE, TimeUnit.class));
Expand Down Expand Up @@ -323,6 +324,7 @@ public void checkParallelFlowable() {

// zero retry is allowed
addOverride(new ParamOverride(Single.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE));
addOverride(new ParamOverride(Single.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE, Predicate.class));

// negative time is considered as zero time
addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "delaySubscription", Long.TYPE, TimeUnit.class));
Expand Down
115 changes: 115 additions & 0 deletions src/test/java/io/reactivex/completable/CompletableRetryTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* Copyright (c) 2017-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.completable;

import io.reactivex.Completable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Predicate;
import io.reactivex.internal.functions.Functions;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class CompletableRetryTest {
@Test
public void retryTimesPredicateWithMatchingPredicate() {
final AtomicInteger atomicInteger = new AtomicInteger(3);
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);

Completable.fromAction(new Action() {
@Override public void run() throws Exception {
numberOfSubscribeCalls.incrementAndGet();

if (atomicInteger.decrementAndGet() != 0) {
throw new RuntimeException();
}

throw new IllegalArgumentException();
}
})
.retry(Integer.MAX_VALUE, new Predicate<Throwable>() {
@Override public boolean test(final Throwable throwable) throws Exception {
return !(throwable instanceof IllegalArgumentException);
}
})
.test()
.assertFailure(IllegalArgumentException.class);

assertEquals(3, numberOfSubscribeCalls.get());
}

@Test
public void retryTimesPredicateWithMatchingRetryAmount() {
final AtomicInteger atomicInteger = new AtomicInteger(3);
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);

Completable.fromAction(new Action() {
@Override public void run() throws Exception {
numberOfSubscribeCalls.incrementAndGet();

if (atomicInteger.decrementAndGet() != 0) {
throw new RuntimeException();
}
}
})
.retry(2, Functions.alwaysTrue())
.test()
.assertResult();

assertEquals(3, numberOfSubscribeCalls.get());
}

@Test
public void retryTimesPredicateWithNotMatchingRetryAmount() {
final AtomicInteger atomicInteger = new AtomicInteger(3);
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);

Completable.fromAction(new Action() {
@Override public void run() throws Exception {
numberOfSubscribeCalls.incrementAndGet();

if (atomicInteger.decrementAndGet() != 0) {
throw new RuntimeException();
}
}
})
.retry(1, Functions.alwaysTrue())
.test()
.assertFailure(RuntimeException.class);

assertEquals(2, numberOfSubscribeCalls.get());
}

@Test
public void retryTimesPredicateWithZeroRetries() {
final AtomicInteger atomicInteger = new AtomicInteger(2);
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);

Completable.fromAction(new Action() {
@Override public void run() throws Exception {
numberOfSubscribeCalls.incrementAndGet();

if (atomicInteger.decrementAndGet() != 0) {
throw new RuntimeException();
}
}
})
.retry(0, Functions.alwaysTrue())
.test()
.assertFailure(RuntimeException.class);

assertEquals(1, numberOfSubscribeCalls.get());
}
}
121 changes: 121 additions & 0 deletions src/test/java/io/reactivex/maybe/MaybeRetryTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/**
* Copyright (c) 2017-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.maybe;

import io.reactivex.Maybe;
import io.reactivex.functions.Predicate;
import io.reactivex.internal.functions.Functions;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class MaybeRetryTest {
@Test
public void retryTimesPredicateWithMatchingPredicate() {
final AtomicInteger atomicInteger = new AtomicInteger(3);
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);

Maybe.fromCallable(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
numberOfSubscribeCalls.incrementAndGet();

if (atomicInteger.decrementAndGet() != 0) {
throw new RuntimeException();
}

throw new IllegalArgumentException();
}
})
.retry(Integer.MAX_VALUE, new Predicate<Throwable>() {
@Override public boolean test(final Throwable throwable) throws Exception {
return !(throwable instanceof IllegalArgumentException);
}
})
.test()
.assertFailure(IllegalArgumentException.class);

assertEquals(3, numberOfSubscribeCalls.get());
}

@Test
public void retryTimesPredicateWithMatchingRetryAmount() {
final AtomicInteger atomicInteger = new AtomicInteger(3);
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);

Maybe.fromCallable(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
numberOfSubscribeCalls.incrementAndGet();

if (atomicInteger.decrementAndGet() != 0) {
throw new RuntimeException();
}

return true;
}
})
.retry(2, Functions.alwaysTrue())
.test()
.assertResult(true);

assertEquals(3, numberOfSubscribeCalls.get());
}

@Test
public void retryTimesPredicateWithNotMatchingRetryAmount() {
final AtomicInteger atomicInteger = new AtomicInteger(3);
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);

Maybe.fromCallable(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
numberOfSubscribeCalls.incrementAndGet();

if (atomicInteger.decrementAndGet() != 0) {
throw new RuntimeException();
}

return true;
}
})
.retry(1, Functions.alwaysTrue())
.test()
.assertFailure(RuntimeException.class);

assertEquals(2, numberOfSubscribeCalls.get());
}

@Test
public void retryTimesPredicateWithZeroRetries() {
final AtomicInteger atomicInteger = new AtomicInteger(2);
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);

Maybe.fromCallable(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
numberOfSubscribeCalls.incrementAndGet();

if (atomicInteger.decrementAndGet() != 0) {
throw new RuntimeException();
}

return true;
}
})
.retry(0, Functions.alwaysTrue())
.test()
.assertFailure(RuntimeException.class);

assertEquals(1, numberOfSubscribeCalls.get());
}
}
Loading

0 comments on commit 34242e1

Please sign in to comment.