Skip to content

Commit

Permalink
Add Single.onErrorResumeNext(Single)
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-zinnatullin committed Jan 31, 2016
1 parent 503d369 commit 67ef32c
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 3 deletions.
31 changes: 31 additions & 0 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,37 @@ public final Single<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFunctio
return lift(new OperatorOnErrorReturn<T>(resumeFunction));
}

/**
* Instructs a Single to pass control to another Single rather than invoking
* {@link Observer#onError(Throwable)} if it encounters an error.
* <p/>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorResumeNext.png" alt="">
* <p/>
* By default, when a Single encounters an error that prevents it from emitting the expected item to
* its {@link Observer}, the Single invokes its Observer's {@code onError} method, and then quits
* without invoking any more of its Observer's methods. The {@code onErrorResumeNext} method changes this
* behavior. If you pass another Single ({@code resumeSingleInCaseOfError}) to an Single's
* {@code onErrorResumeNext} method, if the original Single encounters an error, instead of invoking its
* Observer's {@code onError} method, it will instead relinquish control to {@code resumeSingleInCaseOfError} which
* will invoke the Observer's {@link Observer#onNext onNext} method if it is able to do so. In such a case,
* because no Single necessarily invokes {@code onError}, the Observer may never know that an error
* happened.
* <p/>
* You can use this to prevent errors from propagating or to supply fallback data should errors be
* encountered.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorResumeNext} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param resumeSingleInCaseOfError a Single that will take control if source Single encounters an error.
* @return the original Single, with appropriately modified behavior.
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
*/
public final Single<T> onErrorResumeNext(Single<? extends T> resumeSingleInCaseOfError) {
return new Single<T>(new SingleOperatorOnErrorResumeNextViaSingle<T>(this, resumeSingleInCaseOfError));
}

/**
* Subscribes to a Single but ignore its emission or notification.
* <dl>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package rx.internal.operators;

import rx.Single;
import rx.SingleSubscriber;
import rx.plugins.RxJavaPlugins;

public class SingleOperatorOnErrorResumeNextViaSingle<T> implements Single.OnSubscribe<T> {

private final Single<? extends T> originalSingle;
private final Single<? extends T> resumeSingleInCaseOfError;

public SingleOperatorOnErrorResumeNextViaSingle(Single<? extends T> originalSingle, Single<? extends T> resumeSingleInCaseOfError) {
if (originalSingle == null) {
throw new NullPointerException("originalSingle must not be null");
}

if (resumeSingleInCaseOfError == null) {
throw new NullPointerException("resumeSingleInCaseOfError must not be null");
}

this.originalSingle = originalSingle;
this.resumeSingleInCaseOfError = resumeSingleInCaseOfError;
}

@Override
public void call(final SingleSubscriber<? super T> child) {
final SingleSubscriber<? super T> parent = new SingleSubscriber<T>() {
@Override
public void onSuccess(T value) {
child.onSuccess(value);
}

@Override
public void onError(Throwable error) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(error);
unsubscribe();

resumeSingleInCaseOfError.subscribe(child);
}
};

child.add(parent);
originalSingle.subscribe(parent);
}
}
42 changes: 39 additions & 3 deletions src/test/java/rx/SingleTest.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/**
* Copyright 2015 Netflix, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
Expand Down Expand Up @@ -1182,6 +1182,42 @@ public void doAfterTerminateActionShouldNotBeInvokedUntilSubscriberSubscribes()
verifyZeroInteractions(action);
}

@Test
public void onErrorResumeNextViaSingleShouldNotInterruptSuccessfulSingle() {
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

Single
.just("success")
.onErrorResumeNext(Single.just("fail"))
.subscribe(testSubscriber);

testSubscriber.assertValue("success");
}

@Test
public void onErrorResumeNextViaSingleShouldResumeWithPassedSingleInCaseOfError() {
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

Single
.<String>error(new RuntimeException("test exception"))
.onErrorResumeNext(Single.just("fallback"))
.subscribe(testSubscriber);

testSubscriber.assertValue("fallback");
}

@Test
public void onErrorResumeNextViaSingleShouldPreventNullSingle() {
try {
Single
.just("value")
.onErrorResumeNext(null);
fail();
} catch (NullPointerException expected) {
assertEquals("resumeSingleInCaseOfError must not be null", expected.getMessage());
}
}

@Test(expected = NullPointerException.class)
public void iterableToArrayShouldThrowNullPointerExceptionIfIterableNull() {
Single.iterableToArray(null);
Expand Down

0 comments on commit 67ef32c

Please sign in to comment.