Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: Single.using() #3752

Merged
merged 1 commit into from
Mar 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,30 @@ public final void unsafeSubscribe(Subscriber<? super T> subscriber) {
}
}

/**
* Subscribes an Observer to this single and returns a Subscription that allows
* unsubscription.
*
* @param observer the Observer to subscribe
* @return the Subscription that allows unsubscription
*/
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(new SingleSubscriber<T>() {
@Override
public void onSuccess(T value) {
observer.onNext(value);
observer.onCompleted();
}
@Override
public void onError(Throwable error) {
observer.onError(error);
}
});
}

/**
* Subscribes to a Single and provides a Subscriber that implements functions to handle the item the Single
* emits or any error notification it issues.
Expand Down Expand Up @@ -2541,4 +2565,75 @@ public final Single<T> retryWhen(final Func1<Observable<? extends Throwable>, ?
return toObservable().retryWhen(notificationHandler).toSingle();
}

/**
* Constructs an Single that creates a dependent resource object which is disposed of on unsubscription.
* <p>
* <img width="640" height="400" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/using.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code using} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param resourceFactory
* the factory function to create a resource object that depends on the Single
* @param singleFactory
* the factory function to create a Single
* @param disposeAction
* the function that will dispose of the resource
* @return the Single whose lifetime controls the lifetime of the dependent resource object
* @see <a href="http://reactivex.io/documentation/operators/using.html">ReactiveX operators documentation: Using</a>
*/
@Experimental
public static <T, Resource> Single<T> using(
final Func0<Resource> resourceFactory,
final Func1<? super Resource, ? extends Single<? extends T>> observableFactory,
final Action1<? super Resource> disposeAction) {
return using(resourceFactory, observableFactory, disposeAction, false);
}

/**
* Constructs an Single that creates a dependent resource object which is disposed of just before
* termination if you have set {@code disposeEagerly} to {@code true} and unsubscription does not occur
* before termination. Otherwise resource disposal will occur on unsubscription. Eager disposal is
* particularly appropriate for a synchronous Single that resuses resources. {@code disposeAction} will
* only be called once per subscription.
* <p>
* <img width="640" height="400" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/using.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code using} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @warn "Backpressure Support" section missing from javadoc
* @param resourceFactory
* the factory function to create a resource object that depends on the Single
* @param singleFactory
* the factory function to create a Single
* @param disposeAction
* the function that will dispose of the resource
* @param disposeEagerly
* if {@code true} then disposal will happen either on unsubscription or just before emission of
* a terminal event ({@code onComplete} or {@code onError}).
* @return the Single whose lifetime controls the lifetime of the dependent resource object
* @see <a href="http://reactivex.io/documentation/operators/using.html">ReactiveX operators documentation: Using</a>
* @Experimental The behavior of this can change at any time.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public static <T, Resource> Single<T> using(
final Func0<Resource> resourceFactory,
final Func1<? super Resource, ? extends Single<? extends T>> singleFactory,
final Action1<? super Resource> disposeAction, boolean disposeEagerly) {
if (resourceFactory == null) {
throw new NullPointerException("resourceFactory is null");
}
if (singleFactory == null) {
throw new NullPointerException("singleFactory is null");
}
if (disposeAction == null) {
throw new NullPointerException("disposeAction is null");
}
return create(new SingleOnSubscribeUsing<T, Resource>(resourceFactory, singleFactory, disposeAction, disposeEagerly));
}

}
116 changes: 116 additions & 0 deletions src/main/java/rx/internal/operators/SingleOnSubscribeUsing.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package rx.internal.operators;

import java.util.Arrays;

import rx.*;
import rx.exceptions.*;
import rx.functions.*;
import rx.plugins.RxJavaPlugins;

/**
* Generates a resource, derives a Single from it and disposes that resource once the
* Single terminates.
* @param <T> the value type of the Single
* @param <Resource> the resource type
*/
public final class SingleOnSubscribeUsing<T, Resource> implements Single.OnSubscribe<T> {
final Func0<Resource> resourceFactory;
final Func1<? super Resource, ? extends Single<? extends T>> singleFactory;
final Action1<? super Resource> disposeAction;
final boolean disposeEagerly;

public SingleOnSubscribeUsing(Func0<Resource> resourceFactory,
Func1<? super Resource, ? extends Single<? extends T>> observableFactory,
Action1<? super Resource> disposeAction, boolean disposeEagerly) {
this.resourceFactory = resourceFactory;
this.singleFactory = observableFactory;
this.disposeAction = disposeAction;
this.disposeEagerly = disposeEagerly;
}

@Override
public void call(final SingleSubscriber<? super T> child) {
final Resource resource;

try {
resource = resourceFactory.call();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
child.onError(ex);
return;
}

Single<? extends T> single;

try {
single = singleFactory.call(resource);
} catch (Throwable ex) {
handleSubscriptionTimeError(child, resource, ex);
return;
}

if (single == null) {
handleSubscriptionTimeError(child, resource, new NullPointerException("The single"));
return;
}

SingleSubscriber<T> parent = new SingleSubscriber<T>() {
@Override
public void onSuccess(T value) {
if (disposeEagerly) {
try {
disposeAction.call(resource);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);

child.onError(ex);
return;
}
}

child.onSuccess(value);

if (!disposeEagerly) {
try {
disposeAction.call(resource);
} catch (Throwable ex2) {
Exceptions.throwIfFatal(ex2);
RxJavaPlugins.getInstance().getErrorHandler().handleError(ex2);
}
}
}

@Override
public void onError(Throwable error) {
handleSubscriptionTimeError(child, resource, error);
}
};
child.add(parent);

single.subscribe(parent);
}

void handleSubscriptionTimeError(SingleSubscriber<? super T> t, Resource resource, Throwable ex) {
Exceptions.throwIfFatal(ex);

if (disposeEagerly) {
try {
disposeAction.call(resource);
} catch (Throwable ex2) {
Exceptions.throwIfFatal(ex2);
ex = new CompositeException(Arrays.asList(ex, ex2));
}
}

t.onError(ex);

if (!disposeEagerly) {
try {
disposeAction.call(resource);
} catch (Throwable ex2) {
Exceptions.throwIfFatal(ex2);
RxJavaPlugins.getInstance().getErrorHandler().handleError(ex2);
}
}
}
}
53 changes: 43 additions & 10 deletions src/test/java/rx/SingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,28 @@
*/
package rx;

import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import rx.Single.OnSubscribe;
import rx.exceptions.CompositeException;
import rx.exceptions.*;
import rx.functions.*;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;
import rx.schedulers.*;
import rx.singles.BlockingSingle;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;

import static org.junit.Assert.*;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;

public class SingleTest {

@Test
Expand Down Expand Up @@ -1681,4 +1680,38 @@ public void takeUntilError_withSingle_shouldMatch() {
assertFalse(until.hasObservers());
assertFalse(ts.isUnsubscribed());
}

@Test
public void subscribeWithObserver() {
@SuppressWarnings("unchecked")
Observer<Integer> o = mock(Observer.class);

Single.just(1).subscribe(o);

verify(o).onNext(1);
verify(o).onCompleted();
verify(o, never()).onError(any(Throwable.class));
}

@Test
public void subscribeWithObserverAndGetError() {
@SuppressWarnings("unchecked")
Observer<Integer> o = mock(Observer.class);

Single.<Integer>error(new TestException()).subscribe(o);

verify(o, never()).onNext(anyInt());
verify(o, never()).onCompleted();
verify(o).onError(any(TestException.class));
}

@Test
public void subscribeWithNullObserver() {
try {
Single.just(1).subscribe((Observer<Integer>)null);
fail("Failed to throw NullPointerException");
} catch (NullPointerException ex) {
assertEquals("observer is null", ex.getMessage());
}
}
}
Loading