diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 746362ca78..5795439b25 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -57,7 +57,6 @@ import rx.operators.OperationMergeDelayError; import rx.operators.OperationMergeMaxConcurrent; import rx.operators.OperationMulticast; -import rx.operators.OperationOnErrorResumeNextViaObservable; import rx.operators.OperationOnErrorReturn; import rx.operators.OperationOnExceptionResumeNextViaObservable; import rx.operators.OperationParallelMerge; @@ -108,6 +107,7 @@ import rx.operators.OperatorObserveOn; import rx.operators.OperatorOnErrorFlatMap; import rx.operators.OperatorOnErrorResumeNextViaFunction; +import rx.operators.OperatorOnErrorResumeNextViaObservable; import rx.operators.OperatorParallel; import rx.operators.OperatorPivot; import rx.operators.OperatorRepeat; @@ -4523,7 +4523,7 @@ public final Observable onErrorResumeNext(final Func1RxJava Wiki: onErrorResumeNext() */ public final Observable onErrorResumeNext(final Observable resumeSequence) { - return create(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(this, resumeSequence)); + return lift(new OperatorOnErrorResumeNextViaObservable(resumeSequence)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java deleted file mode 100644 index dd6e662918..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.concurrent.atomic.AtomicReference; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Action0; -import rx.observers.Subscribers; -import rx.subscriptions.Subscriptions; - -/** - * Instruct an Observable to pass control to another Observable rather than invoking - * onError if it encounters an error. - *

- * - *

- * By default, when an Observable encounters an error that prevents it from emitting the expected - * item to its Observer, the Observable invokes its Observer's onError method, and - * then quits without invoking any more of its Observer's methods. The onErrorResumeNext operation - * changes this behavior. If you pass an Observable (resumeSequence) to onErrorResumeNext, if the - * source Observable encounters an error, instead of invoking its Observer's onError - * method, it will instead relinquish control to this new Observable, which will invoke the - * Observer's onNext method if it is able to do so. In such a case, because no - * Observable necessarily invokes onError, the Observer may never know that an error - * happened. - *

- * You can use this to prevent errors from propagating or to supply fallback data should errors be - * encountered. - */ -public final class OperationOnErrorResumeNextViaObservable { - - public static OnSubscribeFunc onErrorResumeNextViaObservable(Observable originalSequence, Observable resumeSequence) { - return new OnErrorResumeNextViaObservable(originalSequence, resumeSequence); - } - - private static class OnErrorResumeNextViaObservable implements OnSubscribeFunc { - - private final Observable resumeSequence; - private final Observable originalSequence; - - public OnErrorResumeNextViaObservable(Observable originalSequence, Observable resumeSequence) { - this.resumeSequence = resumeSequence; - this.originalSequence = originalSequence; - } - - public Subscription onSubscribe(final Observer observer) { - final SafeObservableSubscription subscription = new SafeObservableSubscription(); - - // AtomicReference since we'll be accessing/modifying this across threads so we can switch it if needed - final AtomicReference subscriptionRef = new AtomicReference(subscription); - - // subscribe to the original Observable and remember the subscription - subscription.wrap(originalSequence.unsafeSubscribe(new Subscriber() { - public void onNext(T value) { - // forward the successful calls unless resumed - if (subscriptionRef.get() == subscription) - observer.onNext(value); - } - - /** - * Instead of passing the onError forward, we intercept and "resume" with the resumeSequence. - */ - public void onError(Throwable ex) { - /* remember what the current subscription is so we can determine if someone unsubscribes concurrently */ - SafeObservableSubscription currentSubscription = subscriptionRef.get(); - // check that we have not been unsubscribed and not already resumed before we can process the error - if (currentSubscription == subscription) { - /* error occurred, so switch subscription to the 'resumeSequence' */ - SafeObservableSubscription innerSubscription = new SafeObservableSubscription(resumeSequence.unsafeSubscribe(Subscribers.from(observer))); - /* we changed the sequence, so also change the subscription to the one of the 'resumeSequence' instead */ - if (!subscriptionRef.compareAndSet(currentSubscription, innerSubscription)) { - // we failed to set which means 'subscriptionRef' was set to NULL via the unsubscribe below - // so we want to immediately unsubscribe from the resumeSequence we just subscribed to - innerSubscription.unsubscribe(); - } - } - } - - public void onCompleted() { - // forward the successful calls unless resumed - if (subscriptionRef.get() == subscription) - observer.onCompleted(); - } - })); - - return Subscriptions.create(new Action0() { - public void call() { - // this will get either the original, or the resumeSequence one and unsubscribe on it - Subscription s = subscriptionRef.getAndSet(null); - if (s != null) { - s.unsubscribe(); - } - } - }); - } - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorOnErrorResumeNextViaObservable.java b/rxjava-core/src/main/java/rx/operators/OperatorOnErrorResumeNextViaObservable.java new file mode 100644 index 0000000000..4ef61927a0 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorOnErrorResumeNextViaObservable.java @@ -0,0 +1,77 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import rx.Observable; +import rx.Observable.Operator; +import rx.Subscriber; +import rx.subscriptions.SerialSubscription; + +/** + * Instruct an Observable to pass control to another Observable rather than invoking + * onError if it encounters an error. + *

+ * + *

+ * By default, when an Observable encounters an error that prevents it from emitting the expected + * item to its Observer, the Observable invokes its Observer's onError method, and + * then quits without invoking any more of its Observer's methods. The onErrorResumeNext operation + * changes this behavior. If you pass an Observable (resumeSequence) to onErrorResumeNext, if the + * source Observable encounters an error, instead of invoking its Observer's onError + * method, it will instead relinquish control to this new Observable, which will invoke the + * Observer's onNext method if it is able to do so. In such a case, because no + * Observable necessarily invokes onError, the Observer may never know that an error + * happened. + *

+ * You can use this to prevent errors from propagating or to supply fallback data should errors be + * encountered. + * + * @param the value type + */ +public final class OperatorOnErrorResumeNextViaObservable implements Operator { + final Observable resumeSequence; + + public OperatorOnErrorResumeNextViaObservable(Observable resumeSequence) { + this.resumeSequence = resumeSequence; + } + + @Override + public Subscriber call(final Subscriber child) { + // shared subscription won't work here + Subscriber s = new Subscriber() { + @Override + public void onNext(T t) { + child.onNext(t); + } + + @Override + public void onError(Throwable e) { + unsubscribe(); + resumeSequence.unsafeSubscribe(child); + } + + @Override + public void onCompleted() { + child.onCompleted(); + } + + }; + child.add(s); + + return s; + } + +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationOnErrorResumeNextViaObservableTest.java b/rxjava-core/src/test/java/rx/operators/OperatorOnErrorResumeNextViaObservableTest.java similarity index 93% rename from rxjava-core/src/test/java/rx/operators/OperationOnErrorResumeNextViaObservableTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorOnErrorResumeNextViaObservableTest.java index 193b9c9b5a..600add9cee 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationOnErrorResumeNextViaObservableTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorOnErrorResumeNextViaObservableTest.java @@ -20,7 +20,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static rx.operators.OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable; import org.junit.Test; import org.mockito.Mockito; @@ -30,7 +29,7 @@ import rx.Subscription; import rx.functions.Func1; -public class OperationOnErrorResumeNextViaObservableTest { +public class OperatorOnErrorResumeNextViaObservableTest { @Test public void testResumeNext() { @@ -39,7 +38,7 @@ public void testResumeNext() { TestObservable f = new TestObservable(s, "one", "fail", "two", "three"); Observable w = Observable.create(f); Observable resume = Observable.from("twoResume", "threeResume"); - Observable observable = Observable.create(onErrorResumeNextViaObservable(w, resume)); + Observable observable = w.onErrorResumeNext(resume); @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); @@ -72,6 +71,7 @@ public void testMapResumeAsyncNext() { // Introduce map function that fails intermittently (Map does not prevent this when the observer is a // rx.operator incl onErrorResumeNextViaObservable) w = w.map(new Func1() { + @Override public String call(String s) { if ("fail".equals(s)) throw new RuntimeException("Forced Failure"); @@ -80,7 +80,7 @@ public String call(String s) { } }); - Observable observable = Observable.create(onErrorResumeNextViaObservable(w, resume)); + Observable observable = w.onErrorResumeNext(resume); @SuppressWarnings("unchecked") Observer observer = mock(Observer.class);