Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add retry(times, predicate) to Single & Completable and verify behavior across them and Maybe. #5753

Merged
merged 4 commits into from
Dec 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1545,6 +1545,28 @@ public final Completable retry(long times) {
return fromPublisher(toFlowable().retry(times));
}

/**
* Returns a Completable that when this Completable emits an error, retries at most times
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Phrasing seems a little bit off here, something like "Returns a Completable that retries when …" sounds more natural I guess

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this in sync with all of the other retry* java doc sentences.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh, yeah all rx types have different phrasing for retry operator, alight

* or until the predicate returns false, whichever happens first and emitting the last error.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param times the number of times the returned Completable should retry this Completable
* @param predicate the predicate that is called with the latest throwable and should return
* true to indicate the returned Completable should resubscribe to this Completable.
* @return the new Completable instance
* @throws NullPointerException if predicate is null
* @throws IllegalArgumentException if times is negative
* @since 2.1.8 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable retry(long times, Predicate<? super Throwable> predicate) {
return fromPublisher(toFlowable().retry(times, predicate));
}

/**
* Returns a Completable that when this Completable emits an error, calls the given predicate with
* the latest exception to decide whether to resubscribe to this or not.
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -2649,6 +2649,26 @@ public final Single<T> retry(BiPredicate<? super Integer, ? super Throwable> pre
return toSingle(toFlowable().retry(predicate));
}

/**
* Repeatedly re-subscribe at most times or until the predicate returns false, whichever happens first
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make it more consistent with javadoc you've added for Completable? It didn't mention Repeatedly and was emphasizing error condition, while with this javadoc you only realize in the end of it that error is initial trigger for whole operator

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be yeah but if we only change this method then this one is inconsistent with all of the other Single methods.

* if it fails with an onError.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param times the number of times to resubscribe if the current Single fails
* @param predicate the predicate called with the failure Throwable
* and should return true if a resubscription should happen
* @return the new Single instance
* @since 2.1.8 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> retry(long times, Predicate<? super Throwable> predicate) {
return toSingle(toFlowable().retry(times, predicate));
}

/**
* Re-subscribe to the current Single if the given predicate returns true when the Single fails
* with an onError.
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/io/reactivex/ParamValidationCheckerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public void checkParallelFlowable() {

// zero retry is allowed
addOverride(new ParamOverride(Completable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE));
addOverride(new ParamOverride(Completable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE, Predicate.class));

// negative time is considered as zero time
addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "blockingGet", Long.TYPE, TimeUnit.class));
Expand Down Expand Up @@ -323,6 +324,7 @@ public void checkParallelFlowable() {

// zero retry is allowed
addOverride(new ParamOverride(Single.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE));
addOverride(new ParamOverride(Single.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE, Predicate.class));

// negative time is considered as zero time
addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "delaySubscription", Long.TYPE, TimeUnit.class));
Expand Down
115 changes: 115 additions & 0 deletions src/test/java/io/reactivex/completable/CompletableRetryTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* Copyright (c) 2017-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.completable;

import io.reactivex.Completable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Predicate;
import io.reactivex.internal.functions.Functions;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class CompletableRetryTest {
@Test
public void retryTimesPredicateWithMatchingPredicate() {
final AtomicInteger atomicInteger = new AtomicInteger(3);
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);

Completable.fromAction(new Action() {
@Override public void run() throws Exception {
numberOfSubscribeCalls.incrementAndGet();

if (atomicInteger.decrementAndGet() != 0) {
throw new RuntimeException();
}

throw new IllegalArgumentException();
}
})
.retry(Integer.MAX_VALUE, new Predicate<Throwable>() {
@Override public boolean test(final Throwable throwable) throws Exception {
return !(throwable instanceof IllegalArgumentException);
}
})
.test()
.assertFailure(IllegalArgumentException.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, I believe you should check atomicInteger value here to verify actual number of action retries

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same below and in tests for other reactive types


assertEquals(3, numberOfSubscribeCalls.get());
}

@Test
public void retryTimesPredicateWithMatchingRetryAmount() {
final AtomicInteger atomicInteger = new AtomicInteger(3);
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);

Completable.fromAction(new Action() {
@Override public void run() throws Exception {
numberOfSubscribeCalls.incrementAndGet();

if (atomicInteger.decrementAndGet() != 0) {
throw new RuntimeException();
}
}
})
.retry(2, Functions.alwaysTrue())
.test()
.assertResult();

assertEquals(3, numberOfSubscribeCalls.get());
}

@Test
public void retryTimesPredicateWithNotMatchingRetryAmount() {
final AtomicInteger atomicInteger = new AtomicInteger(3);
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);

Completable.fromAction(new Action() {
@Override public void run() throws Exception {
numberOfSubscribeCalls.incrementAndGet();

if (atomicInteger.decrementAndGet() != 0) {
throw new RuntimeException();
}
}
})
.retry(1, Functions.alwaysTrue())
.test()
.assertFailure(RuntimeException.class);

assertEquals(2, numberOfSubscribeCalls.get());
}

@Test
public void retryTimesPredicateWithZeroRetries() {
final AtomicInteger atomicInteger = new AtomicInteger(2);
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);

Completable.fromAction(new Action() {
@Override public void run() throws Exception {
numberOfSubscribeCalls.incrementAndGet();

if (atomicInteger.decrementAndGet() != 0) {
throw new RuntimeException();
}
}
})
.retry(0, Functions.alwaysTrue())
.test()
.assertFailure(RuntimeException.class);

assertEquals(1, numberOfSubscribeCalls.get());
}
}
121 changes: 121 additions & 0 deletions src/test/java/io/reactivex/maybe/MaybeRetryTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/**
* Copyright (c) 2017-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.maybe;

import io.reactivex.Maybe;
import io.reactivex.functions.Predicate;
import io.reactivex.internal.functions.Functions;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class MaybeRetryTest {
@Test
public void retryTimesPredicateWithMatchingPredicate() {
final AtomicInteger atomicInteger = new AtomicInteger(3);
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);

Maybe.fromCallable(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
numberOfSubscribeCalls.incrementAndGet();

if (atomicInteger.decrementAndGet() != 0) {
throw new RuntimeException();
}

throw new IllegalArgumentException();
}
})
.retry(Integer.MAX_VALUE, new Predicate<Throwable>() {
@Override public boolean test(final Throwable throwable) throws Exception {
return !(throwable instanceof IllegalArgumentException);
}
})
.test()
.assertFailure(IllegalArgumentException.class);

assertEquals(3, numberOfSubscribeCalls.get());
}

@Test
public void retryTimesPredicateWithMatchingRetryAmount() {
final AtomicInteger atomicInteger = new AtomicInteger(3);
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);

Maybe.fromCallable(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
numberOfSubscribeCalls.incrementAndGet();

if (atomicInteger.decrementAndGet() != 0) {
throw new RuntimeException();
}

return true;
}
})
.retry(2, Functions.alwaysTrue())
.test()
.assertResult(true);

assertEquals(3, numberOfSubscribeCalls.get());
}

@Test
public void retryTimesPredicateWithNotMatchingRetryAmount() {
final AtomicInteger atomicInteger = new AtomicInteger(3);
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);

Maybe.fromCallable(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
numberOfSubscribeCalls.incrementAndGet();

if (atomicInteger.decrementAndGet() != 0) {
throw new RuntimeException();
}

return true;
}
})
.retry(1, Functions.alwaysTrue())
.test()
.assertFailure(RuntimeException.class);

assertEquals(2, numberOfSubscribeCalls.get());
}

@Test
public void retryTimesPredicateWithZeroRetries() {
final AtomicInteger atomicInteger = new AtomicInteger(2);
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);

Maybe.fromCallable(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
numberOfSubscribeCalls.incrementAndGet();

if (atomicInteger.decrementAndGet() != 0) {
throw new RuntimeException();
}

return true;
}
})
.retry(0, Functions.alwaysTrue())
.test()
.assertFailure(RuntimeException.class);

assertEquals(1, numberOfSubscribeCalls.get());
}
}
Loading