From a6b39902b83b545078be4db08afd067927605a65 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 24 Apr 2014 13:12:18 +0200 Subject: [PATCH] OperatorDefer --- rxjava-core/src/main/java/rx/Observable.java | 4 +-- ...OperationDefer.java => OperatorDefer.java} | 32 +++++++++++-------- ...nDeferTest.java => OperatorDeferTest.java} | 27 ++++++++++++---- 3 files changed, 40 insertions(+), 23 deletions(-) rename rxjava-core/src/main/java/rx/operators/{OperationDefer.java => OperatorDefer.java} (65%) rename rxjava-core/src/test/java/rx/operators/{OperationDeferTest.java => OperatorDeferTest.java} (75%) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 7fea0a1fb1..03ca49c5c9 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -53,7 +53,6 @@ import rx.operators.OperationConcat; import rx.operators.OperationDebounce; import rx.operators.OperationDefaultIfEmpty; -import rx.operators.OperationDefer; import rx.operators.OperationDelay; import rx.operators.OperationDematerialize; import rx.operators.OperationDistinct; @@ -95,6 +94,7 @@ import rx.operators.OperatorAsObservable; import rx.operators.OperatorCache; import rx.operators.OperatorCast; +import rx.operators.OperatorDefer; import rx.operators.OperatorDoOnEach; import rx.operators.OperatorElementAt; import rx.operators.OperatorFilter; @@ -988,7 +988,7 @@ public final static Observable concat(Observable t1, Observa * @see RxJava Wiki: defer() */ public final static Observable defer(Func0> observableFactory) { - return create(OperationDefer.defer(observableFactory)); + return create(new OperatorDefer(observableFactory)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationDefer.java b/rxjava-core/src/main/java/rx/operators/OperatorDefer.java similarity index 65% rename from rxjava-core/src/main/java/rx/operators/OperationDefer.java rename to rxjava-core/src/main/java/rx/operators/OperatorDefer.java index 734f6bbf19..a18d350c4e 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDefer.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorDefer.java @@ -16,11 +16,9 @@ package rx.operators; import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscription; +import rx.Observable.OnSubscribe; +import rx.Subscriber; import rx.functions.Func0; -import rx.observers.Subscribers; /** * Do not create the Observable until an Observer subscribes; create a fresh Observable on each @@ -32,17 +30,23 @@ * return an Observable that will call this function to generate its Observable sequence afresh * each time a new Observer subscribes. */ -public final class OperationDefer { +public final class OperatorDefer implements OnSubscribe { + final Func0> observableFactory; - public static OnSubscribeFunc defer(final Func0> observableFactory) { - - return new OnSubscribeFunc() { - @Override - public Subscription onSubscribe(Observer observer) { - Observable obs = observableFactory.call(); - return obs.unsafeSubscribe(Subscribers.from(observer)); - } - }; + public OperatorDefer(Func0> observableFactory) { + this.observableFactory = observableFactory; + } + @Override + public void call(Subscriber s) { + Observable o; + try { + o = observableFactory.call(); + } catch (Throwable t) { + s.onError(t); + return; + } + o.unsafeSubscribe(s); } + } diff --git a/rxjava-core/src/test/java/rx/operators/OperationDeferTest.java b/rxjava-core/src/test/java/rx/operators/OperatorDeferTest.java similarity index 75% rename from rxjava-core/src/test/java/rx/operators/OperationDeferTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorDeferTest.java index d6e0f5cef0..710b75f794 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationDeferTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorDeferTest.java @@ -15,11 +15,7 @@ */ package rx.operators; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import org.junit.Test; @@ -27,10 +23,10 @@ import rx.Observer; import rx.functions.Func0; -public class OperationDeferTest { +@SuppressWarnings("unchecked") +public class OperatorDeferTest { @Test - @SuppressWarnings("unchecked") public void testDefer() throws Throwable { Func0> factory = mock(Func0.class); @@ -64,4 +60,21 @@ public void testDefer() throws Throwable { verify(secondObserver, times(1)).onCompleted(); } + + @Test + public void testDeferFunctionThrows() { + Func0> factory = mock(Func0.class); + + when(factory.call()).thenThrow(new OperationReduceTest.CustomException()); + + Observable result = Observable.defer(factory); + + Observer o = mock(Observer.class); + + result.subscribe(o); + + verify(o).onError(any(OperationReduceTest.CustomException.class)); + verify(o, never()).onNext(any(String.class)); + verify(o, never()).onCompleted(); + } }