Skip to content

Commit

Permalink
OperatorOnErrorResumeNextViaObservable
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored and akarnokd committed Apr 25, 2014
1 parent 95e0636 commit b3e7b7b
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 121 deletions.
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import rx.operators.OperationMergeDelayError;
import rx.operators.OperationMergeMaxConcurrent;
import rx.operators.OperationMulticast;
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationOnExceptionResumeNextViaObservable;
import rx.operators.OperationParallelMerge;
Expand Down Expand Up @@ -108,6 +107,7 @@
import rx.operators.OperatorObserveOn;
import rx.operators.OperatorOnErrorFlatMap;
import rx.operators.OperatorOnErrorResumeNextViaFunction;
import rx.operators.OperatorOnErrorResumeNextViaObservable;
import rx.operators.OperatorParallel;
import rx.operators.OperatorPivot;
import rx.operators.OperatorRepeat;
Expand Down Expand Up @@ -4523,7 +4523,7 @@ public final Observable<T> onErrorResumeNext(final Func1<Throwable, ? extends Ob
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#wiki-onerrorresumenext">RxJava Wiki: onErrorResumeNext()</a>
*/
public final Observable<T> onErrorResumeNext(final Observable<? extends T> resumeSequence) {
return create(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(this, resumeSequence));
return lift(new OperatorOnErrorResumeNextViaObservable<T>(resumeSequence));
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.operators;

import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.subscriptions.SerialSubscription;

/**
* Instruct an Observable to pass control to another Observable rather than invoking
* <code>onError</code> if it encounters an error.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/onErrorResumeNext.png">
* <p>
* By default, when an Observable encounters an error that prevents it from emitting the expected
* item to its Observer, the Observable invokes its Observer's <code>onError</code> method, and
* then quits without invoking any more of its Observer's methods. The onErrorResumeNext operation
* changes this behavior. If you pass an Observable (resumeSequence) to onErrorResumeNext, if the
* source Observable encounters an error, instead of invoking its Observer's <code>onError</code>
* method, it will instead relinquish control to this new Observable, which will invoke the
* Observer's <code>onNext</code> method if it is able to do so. In such a case, because no
* Observable necessarily invokes <code>onError</code>, the Observer may never know that an error
* happened.
* <p>
* You can use this to prevent errors from propagating or to supply fallback data should errors be
* encountered.
*
* @param <T> the value type
*/
public final class OperatorOnErrorResumeNextViaObservable<T> implements Operator<T, T> {
final Observable<? extends T> resumeSequence;

public OperatorOnErrorResumeNextViaObservable(Observable<? extends T> resumeSequence) {
this.resumeSequence = resumeSequence;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
// shared subscription won't work here
Subscriber<T> s = new Subscriber<T>() {
@Override
public void onNext(T t) {
child.onNext(t);
}

@Override
public void onError(Throwable e) {
unsubscribe();
resumeSequence.unsafeSubscribe(child);
}

@Override
public void onCompleted() {
child.onCompleted();
}

};
child.add(s);

return s;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static rx.operators.OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable;

import org.junit.Test;
import org.mockito.Mockito;
Expand All @@ -30,7 +29,7 @@
import rx.Subscription;
import rx.functions.Func1;

public class OperationOnErrorResumeNextViaObservableTest {
public class OperatorOnErrorResumeNextViaObservableTest {

@Test
public void testResumeNext() {
Expand All @@ -39,7 +38,7 @@ public void testResumeNext() {
TestObservable f = new TestObservable(s, "one", "fail", "two", "three");
Observable<String> w = Observable.create(f);
Observable<String> resume = Observable.from("twoResume", "threeResume");
Observable<String> observable = Observable.create(onErrorResumeNextViaObservable(w, resume));
Observable<String> observable = w.onErrorResumeNext(resume);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand Down Expand Up @@ -72,6 +71,7 @@ public void testMapResumeAsyncNext() {
// Introduce map function that fails intermittently (Map does not prevent this when the observer is a
// rx.operator incl onErrorResumeNextViaObservable)
w = w.map(new Func1<String, String>() {
@Override
public String call(String s) {
if ("fail".equals(s))
throw new RuntimeException("Forced Failure");
Expand All @@ -80,7 +80,7 @@ public String call(String s) {
}
});

Observable<String> observable = Observable.create(onErrorResumeNextViaObservable(w, resume));
Observable<String> observable = w.onErrorResumeNext(resume);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand Down

0 comments on commit b3e7b7b

Please sign in to comment.