diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index a8a10bafb9..bacdd90667 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -1597,6 +1597,30 @@ public final void unsafeSubscribe(Subscriber subscriber) { } } + /** + * Subscribes an Observer to this single and returns a Subscription that allows + * unsubscription. + * + * @param observer the Observer to subscribe + * @return the Subscription that allows unsubscription + */ + public final Subscription subscribe(final Observer observer) { + if (observer == null) { + throw new NullPointerException("observer is null"); + } + return subscribe(new SingleSubscriber() { + @Override + public void onSuccess(T value) { + observer.onNext(value); + observer.onCompleted(); + } + @Override + public void onError(Throwable error) { + observer.onError(error); + } + }); + } + /** * Subscribes to a Single and provides a Subscriber that implements functions to handle the item the Single * emits or any error notification it issues. @@ -2541,4 +2565,75 @@ public final Single retryWhen(final Func1, ? return toObservable().retryWhen(notificationHandler).toSingle(); } + /** + * Constructs an Single that creates a dependent resource object which is disposed of on unsubscription. + *

+ * + *

+ *
Scheduler:
+ *
{@code using} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param resourceFactory + * the factory function to create a resource object that depends on the Single + * @param singleFactory + * the factory function to create a Single + * @param disposeAction + * the function that will dispose of the resource + * @return the Single whose lifetime controls the lifetime of the dependent resource object + * @see ReactiveX operators documentation: Using + */ + @Experimental + public static Single using( + final Func0 resourceFactory, + final Func1> observableFactory, + final Action1 disposeAction) { + return using(resourceFactory, observableFactory, disposeAction, false); + } + + /** + * Constructs an Single that creates a dependent resource object which is disposed of just before + * termination if you have set {@code disposeEagerly} to {@code true} and unsubscription does not occur + * before termination. Otherwise resource disposal will occur on unsubscription. Eager disposal is + * particularly appropriate for a synchronous Single that resuses resources. {@code disposeAction} will + * only be called once per subscription. + *

+ * + *

+ *
Scheduler:
+ *
{@code using} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @warn "Backpressure Support" section missing from javadoc + * @param resourceFactory + * the factory function to create a resource object that depends on the Single + * @param singleFactory + * the factory function to create a Single + * @param disposeAction + * the function that will dispose of the resource + * @param disposeEagerly + * if {@code true} then disposal will happen either on unsubscription or just before emission of + * a terminal event ({@code onComplete} or {@code onError}). + * @return the Single whose lifetime controls the lifetime of the dependent resource object + * @see ReactiveX operators documentation: Using + * @Experimental The behavior of this can change at any time. + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + public static Single using( + final Func0 resourceFactory, + final Func1> singleFactory, + final Action1 disposeAction, boolean disposeEagerly) { + if (resourceFactory == null) { + throw new NullPointerException("resourceFactory is null"); + } + if (singleFactory == null) { + throw new NullPointerException("singleFactory is null"); + } + if (disposeAction == null) { + throw new NullPointerException("disposeAction is null"); + } + return create(new SingleOnSubscribeUsing(resourceFactory, singleFactory, disposeAction, disposeEagerly)); + } + } diff --git a/src/main/java/rx/internal/operators/SingleOnSubscribeUsing.java b/src/main/java/rx/internal/operators/SingleOnSubscribeUsing.java new file mode 100644 index 0000000000..8bd73a29b0 --- /dev/null +++ b/src/main/java/rx/internal/operators/SingleOnSubscribeUsing.java @@ -0,0 +1,116 @@ +package rx.internal.operators; + +import java.util.Arrays; + +import rx.*; +import rx.exceptions.*; +import rx.functions.*; +import rx.plugins.RxJavaPlugins; + +/** + * Generates a resource, derives a Single from it and disposes that resource once the + * Single terminates. + * @param the value type of the Single + * @param the resource type + */ +public final class SingleOnSubscribeUsing implements Single.OnSubscribe { + final Func0 resourceFactory; + final Func1> singleFactory; + final Action1 disposeAction; + final boolean disposeEagerly; + + public SingleOnSubscribeUsing(Func0 resourceFactory, + Func1> observableFactory, + Action1 disposeAction, boolean disposeEagerly) { + this.resourceFactory = resourceFactory; + this.singleFactory = observableFactory; + this.disposeAction = disposeAction; + this.disposeEagerly = disposeEagerly; + } + + @Override + public void call(final SingleSubscriber child) { + final Resource resource; + + try { + resource = resourceFactory.call(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + child.onError(ex); + return; + } + + Single single; + + try { + single = singleFactory.call(resource); + } catch (Throwable ex) { + handleSubscriptionTimeError(child, resource, ex); + return; + } + + if (single == null) { + handleSubscriptionTimeError(child, resource, new NullPointerException("The single")); + return; + } + + SingleSubscriber parent = new SingleSubscriber() { + @Override + public void onSuccess(T value) { + if (disposeEagerly) { + try { + disposeAction.call(resource); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + + child.onError(ex); + return; + } + } + + child.onSuccess(value); + + if (!disposeEagerly) { + try { + disposeAction.call(resource); + } catch (Throwable ex2) { + Exceptions.throwIfFatal(ex2); + RxJavaPlugins.getInstance().getErrorHandler().handleError(ex2); + } + } + } + + @Override + public void onError(Throwable error) { + handleSubscriptionTimeError(child, resource, error); + } + }; + child.add(parent); + + single.subscribe(parent); + } + + void handleSubscriptionTimeError(SingleSubscriber t, Resource resource, Throwable ex) { + Exceptions.throwIfFatal(ex); + + if (disposeEagerly) { + try { + disposeAction.call(resource); + } catch (Throwable ex2) { + Exceptions.throwIfFatal(ex2); + ex = new CompositeException(Arrays.asList(ex, ex2)); + } + } + + t.onError(ex); + + if (!disposeEagerly) { + try { + disposeAction.call(resource); + } catch (Throwable ex2) { + Exceptions.throwIfFatal(ex2); + RxJavaPlugins.getInstance().getErrorHandler().handleError(ex2); + } + } + } +} diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java index 17794e4dbb..d2457da4e9 100644 --- a/src/test/java/rx/SingleTest.java +++ b/src/test/java/rx/SingleTest.java @@ -12,29 +12,28 @@ */ package rx; -import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; import java.io.IOException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + import rx.Single.OnSubscribe; -import rx.exceptions.CompositeException; +import rx.exceptions.*; import rx.functions.*; import rx.observers.TestSubscriber; -import rx.schedulers.Schedulers; -import rx.schedulers.TestScheduler; +import rx.schedulers.*; import rx.singles.BlockingSingle; import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; -import static org.junit.Assert.*; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.*; - public class SingleTest { @Test @@ -1681,4 +1680,38 @@ public void takeUntilError_withSingle_shouldMatch() { assertFalse(until.hasObservers()); assertFalse(ts.isUnsubscribed()); } + + @Test + public void subscribeWithObserver() { + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + Single.just(1).subscribe(o); + + verify(o).onNext(1); + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + + @Test + public void subscribeWithObserverAndGetError() { + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + Single.error(new TestException()).subscribe(o); + + verify(o, never()).onNext(anyInt()); + verify(o, never()).onCompleted(); + verify(o).onError(any(TestException.class)); + } + + @Test + public void subscribeWithNullObserver() { + try { + Single.just(1).subscribe((Observer)null); + fail("Failed to throw NullPointerException"); + } catch (NullPointerException ex) { + assertEquals("observer is null", ex.getMessage()); + } + } } diff --git a/src/test/java/rx/internal/operators/SingleOnSubscribeUsingTest.java b/src/test/java/rx/internal/operators/SingleOnSubscribeUsingTest.java new file mode 100644 index 0000000000..238f373115 --- /dev/null +++ b/src/test/java/rx/internal/operators/SingleOnSubscribeUsingTest.java @@ -0,0 +1,492 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.*; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.*; +import rx.Observer; +import rx.exceptions.TestException; +import rx.functions.*; +import rx.subscriptions.Subscriptions; + +public class SingleOnSubscribeUsingTest { + + private interface Resource { + String getTextFromWeb(); + + void dispose(); + } + + private static class DisposeAction implements Action1 { + + @Override + public void call(Resource r) { + r.dispose(); + } + + } + + private final Action1 disposeSubscription = new Action1() { + + @Override + public void call(Subscription s) { + s.unsubscribe(); + } + + }; + + @Test + public void nonEagerly() { + performTestUsing(false); + } + + @Test + public void eagerly() { + performTestUsing(true); + } + + private void performTestUsing(boolean disposeEagerly) { + final Resource resource = mock(Resource.class); + when(resource.getTextFromWeb()).thenReturn("Hello world!"); + + Func0 resourceFactory = new Func0() { + @Override + public Resource call() { + return resource; + } + }; + + Func1> observableFactory = new Func1>() { + @Override + public Single call(Resource resource) { + return Single.just(resource.getTextFromWeb()); + } + }; + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + Single observable = Single.using(resourceFactory, observableFactory, + new DisposeAction(), disposeEagerly); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer).onNext("Hello world!"); + inOrder.verify(observer).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + // The resouce should be closed + verify(resource).dispose(); + } + + @Test + public void withSubscribingTwice() { + performTestUsingWithSubscribingTwice(false); + } + + @Test + public void withSubscribingTwiceDisposeEagerly() { + performTestUsingWithSubscribingTwice(true); + } + + private void performTestUsingWithSubscribingTwice(boolean disposeEagerly) { + // When subscribe is called, a new resource should be created. + Func0 resourceFactory = new Func0() { + @Override + public Resource call() { + return new Resource() { + + boolean first = true; + + @Override + public String getTextFromWeb() { + if (first) { + first = false; + return "Hello world!"; + } + return "Nothing"; + } + + @Override + public void dispose() { + // do nothing + } + + }; + } + }; + + Func1> observableFactory = new Func1>() { + @Override + public Single call(Resource resource) { + return Single.just(resource.getTextFromWeb()); + } + }; + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + Single observable = Single.using(resourceFactory, observableFactory, + new DisposeAction(), disposeEagerly); + observable.subscribe(observer); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + inOrder.verify(observer).onNext("Hello world!"); + inOrder.verify(observer).onCompleted(); + + inOrder.verify(observer).onNext("Hello world!"); + inOrder.verify(observer).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test(expected = TestException.class) + public void withResourceFactoryError() { + performTestUsingWithResourceFactoryError(false); + } + + @Test(expected = TestException.class) + public void withResourceFactoryErrorDisposeEagerly() { + performTestUsingWithResourceFactoryError(true); + } + + private void performTestUsingWithResourceFactoryError(boolean disposeEagerly) { + Func0 resourceFactory = new Func0() { + @Override + public Subscription call() { + throw new TestException(); + } + }; + + Func1> observableFactory = new Func1>() { + @Override + public Single call(Subscription subscription) { + return Single.just(1); + } + }; + + Single.using(resourceFactory, observableFactory, disposeSubscription) + .toBlocking().value(); + } + + @Test + public void withSingleFactoryError() { + performTestUsingWithSingleFactoryError(false); + } + + @Test + public void withSingleFactoryErrorDisposeEagerly() { + performTestUsingWithSingleFactoryError(true); + } + + private void performTestUsingWithSingleFactoryError(boolean disposeEagerly) { + final Action0 unsubscribe = mock(Action0.class); + Func0 resourceFactory = new Func0() { + @Override + public Subscription call() { + return Subscriptions.create(unsubscribe); + } + }; + + Func1> observableFactory = new Func1>() { + @Override + public Single call(Subscription subscription) { + throw new TestException(); + } + }; + + try { + Single.using(resourceFactory, observableFactory, disposeSubscription) + .toBlocking().value(); + fail("Should throw a TestException when the observableFactory throws it"); + } catch (TestException e) { + // Make sure that unsubscribe is called so that users can close + // the resource if some error happens. + verify(unsubscribe).call(); + } + } + + @Test + public void withSingleFactoryErrorInOnSubscribe() { + performTestUsingWithSingleFactoryErrorInOnSubscribe(false); + } + + @Test + public void withSingleFactoryErrorInOnSubscribeDisposeEagerly() { + performTestUsingWithSingleFactoryErrorInOnSubscribe(true); + } + + private void performTestUsingWithSingleFactoryErrorInOnSubscribe(boolean disposeEagerly) { + final Action0 unsubscribe = mock(Action0.class); + Func0 resourceFactory = new Func0() { + @Override + public Subscription call() { + return Subscriptions.create(unsubscribe); + } + }; + + Func1> observableFactory = new Func1>() { + @Override + public Single call(Subscription subscription) { + return Single.create(new Single.OnSubscribe() { + @Override + public void call(SingleSubscriber t1) { + throw new TestException(); + } + }); + } + }; + + try { + Single + .using(resourceFactory, observableFactory, disposeSubscription, disposeEagerly) + .toBlocking().value(); + fail("Should throw a TestException when the observableFactory throws it"); + } catch (TestException e) { + // Make sure that unsubscribe is called so that users can close + // the resource if some error happens. + verify(unsubscribe).call(); + } + } + + @Test + public void disposesEagerlyBeforeCompletion() { + final List events = new ArrayList(); + Func0 resourceFactory = createResourceFactory(events); + final Action1 completion = createOnSuccessAction(events); + final Action0 unsub =createUnsubAction(events); + + Func1> observableFactory = new Func1>() { + @Override + public Single call(Resource resource) { + return Single.just(resource.getTextFromWeb()); + } + }; + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + Single observable = Single.using(resourceFactory, observableFactory, + new DisposeAction(), true).doOnUnsubscribe(unsub) + .doOnSuccess(completion); + observable.subscribe(observer); + + assertEquals(Arrays.asList("disposed", "completed", "unsub"), events); + + } + + @Test + public void doesNotDisposesEagerlyBeforeCompletion() { + final List events = new ArrayList(); + Func0 resourceFactory = createResourceFactory(events); + final Action1 completion = createOnSuccessAction(events); + final Action0 unsub =createUnsubAction(events); + + Func1> observableFactory = new Func1>() { + @Override + public Single call(Resource resource) { + return Single.just(resource.getTextFromWeb()); + } + }; + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + Single observable = Single.using(resourceFactory, observableFactory, + new DisposeAction(), false).doOnUnsubscribe(unsub) + .doOnSuccess(completion); + observable.subscribe(observer); + + assertEquals(Arrays.asList("completed", "unsub", "disposed"), events); + + } + + + + @Test + public void disposesEagerlyBeforeError() { + final List events = new ArrayList(); + Func0 resourceFactory = createResourceFactory(events); + final Action1 onError = createOnErrorAction(events); + final Action0 unsub = createUnsubAction(events); + + Func1> observableFactory = new Func1>() { + @Override + public Single call(Resource resource) { + return Single.error(new RuntimeException()); + } + }; + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + Single observable = Single.using(resourceFactory, observableFactory, + new DisposeAction(), true).doOnUnsubscribe(unsub) + .doOnError(onError); + observable.subscribe(observer); + + assertEquals(Arrays.asList("disposed", "error", "unsub"), events); + + } + + @Test + public void doesNotDisposesEagerlyBeforeError() { + final List events = new ArrayList(); + Func0 resourceFactory = createResourceFactory(events); + final Action1 onError = createOnErrorAction(events); + final Action0 unsub = createUnsubAction(events); + + Func1> observableFactory = new Func1>() { + @Override + public Single call(Resource resource) { + return Single.error(new RuntimeException()); + } + }; + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + Single observable = Single.using(resourceFactory, observableFactory, + new DisposeAction(), false).doOnUnsubscribe(unsub) + .doOnError(onError); + observable.subscribe(observer); + + assertEquals(Arrays.asList("error", "unsub", "disposed"), events); + } + + private static Action0 createUnsubAction(final List events) { + return new Action0() { + @Override + public void call() { + events.add("unsub"); + } + }; + } + + private static Action1 createOnErrorAction(final List events) { + return new Action1() { + @Override + public void call(Throwable t) { + events.add("error"); + } + }; + } + + private static Func0 createResourceFactory(final List events) { + return new Func0() { + @Override + public Resource call() { + return new Resource() { + + @Override + public String getTextFromWeb() { + return "hello world"; + } + + @Override + public void dispose() { + events.add("disposed"); + } + }; + } + }; + } + + private static Action1 createOnSuccessAction(final List events) { + return new Action1() { + @Override + public void call(String s) { + events.add("completed"); + } + }; + } + + @Test + public void nullResourceFactory() { + try { + final Resource resource = mock(Resource.class); + when(resource.getTextFromWeb()).thenReturn("Hello world!"); + + Func1> observableFactory = new Func1>() { + @Override + public Single call(Resource resource) { + return Single.just(resource.getTextFromWeb()); + } + }; + + Single.using(null, observableFactory, + new DisposeAction(), false); + + fail("Failed to throw NullPointerException"); + } catch (NullPointerException ex) { + assertEquals("resourceFactory is null", ex.getMessage()); + } + } + + @Test + public void nullSingeFactory() { + try { + final Resource resource = mock(Resource.class); + when(resource.getTextFromWeb()).thenReturn("Hello world!"); + + Func0 resourceFactory = new Func0() { + @Override + public Resource call() { + return resource; + } + }; + + Single.using(resourceFactory, null, + new DisposeAction(), false); + + fail("Failed to throw NullPointerException"); + } catch (NullPointerException ex) { + assertEquals("singleFactory is null", ex.getMessage()); + } + } + + @Test + public void nullDisposeAction() { + try { + final Resource resource = mock(Resource.class); + when(resource.getTextFromWeb()).thenReturn("Hello world!"); + + Func0 resourceFactory = new Func0() { + @Override + public Resource call() { + return resource; + } + }; + + Func1> observableFactory = new Func1>() { + @Override + public Single call(Resource resource) { + return Single.just(resource.getTextFromWeb()); + } + }; + + Single.using(resourceFactory, observableFactory, + null, false); + + fail("Failed to throw NullPointerException"); + } catch (NullPointerException ex) { + assertEquals("disposeAction is null", ex.getMessage()); + } + } + +}