Skip to content

Commit

Permalink
Merge pull request #3752 from akarnokd/SingleUsing1x
Browse files Browse the repository at this point in the history
1.x: Single.using()
  • Loading branch information
akarnokd committed Mar 14, 2016
2 parents 662ce3b + 1be11d6 commit ad73819
Show file tree
Hide file tree
Showing 4 changed files with 746 additions and 10 deletions.
95 changes: 95 additions & 0 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,30 @@ public final void unsafeSubscribe(Subscriber<? super T> subscriber) {
}
}

/**
* Subscribes an Observer to this single and returns a Subscription that allows
* unsubscription.
*
* @param observer the Observer to subscribe
* @return the Subscription that allows unsubscription
*/
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(new SingleSubscriber<T>() {
@Override
public void onSuccess(T value) {
observer.onNext(value);
observer.onCompleted();
}
@Override
public void onError(Throwable error) {
observer.onError(error);
}
});
}

/**
* Subscribes to a Single and provides a Subscriber that implements functions to handle the item the Single
* emits or any error notification it issues.
Expand Down Expand Up @@ -2541,4 +2565,75 @@ public final Single<T> retryWhen(final Func1<Observable<? extends Throwable>, ?
return toObservable().retryWhen(notificationHandler).toSingle();
}

/**
* Constructs an Single that creates a dependent resource object which is disposed of on unsubscription.
* <p>
* <img width="640" height="400" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/using.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code using} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param resourceFactory
* the factory function to create a resource object that depends on the Single
* @param singleFactory
* the factory function to create a Single
* @param disposeAction
* the function that will dispose of the resource
* @return the Single whose lifetime controls the lifetime of the dependent resource object
* @see <a href="http://reactivex.io/documentation/operators/using.html">ReactiveX operators documentation: Using</a>
*/
@Experimental
public static <T, Resource> Single<T> using(
final Func0<Resource> resourceFactory,
final Func1<? super Resource, ? extends Single<? extends T>> observableFactory,
final Action1<? super Resource> disposeAction) {
return using(resourceFactory, observableFactory, disposeAction, false);
}

/**
* Constructs an Single that creates a dependent resource object which is disposed of just before
* termination if you have set {@code disposeEagerly} to {@code true} and unsubscription does not occur
* before termination. Otherwise resource disposal will occur on unsubscription. Eager disposal is
* particularly appropriate for a synchronous Single that resuses resources. {@code disposeAction} will
* only be called once per subscription.
* <p>
* <img width="640" height="400" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/using.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code using} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @warn "Backpressure Support" section missing from javadoc
* @param resourceFactory
* the factory function to create a resource object that depends on the Single
* @param singleFactory
* the factory function to create a Single
* @param disposeAction
* the function that will dispose of the resource
* @param disposeEagerly
* if {@code true} then disposal will happen either on unsubscription or just before emission of
* a terminal event ({@code onComplete} or {@code onError}).
* @return the Single whose lifetime controls the lifetime of the dependent resource object
* @see <a href="http://reactivex.io/documentation/operators/using.html">ReactiveX operators documentation: Using</a>
* @Experimental The behavior of this can change at any time.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public static <T, Resource> Single<T> using(
final Func0<Resource> resourceFactory,
final Func1<? super Resource, ? extends Single<? extends T>> singleFactory,
final Action1<? super Resource> disposeAction, boolean disposeEagerly) {
if (resourceFactory == null) {
throw new NullPointerException("resourceFactory is null");
}
if (singleFactory == null) {
throw new NullPointerException("singleFactory is null");
}
if (disposeAction == null) {
throw new NullPointerException("disposeAction is null");
}
return create(new SingleOnSubscribeUsing<T, Resource>(resourceFactory, singleFactory, disposeAction, disposeEagerly));
}

}
116 changes: 116 additions & 0 deletions src/main/java/rx/internal/operators/SingleOnSubscribeUsing.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package rx.internal.operators;

import java.util.Arrays;

import rx.*;
import rx.exceptions.*;
import rx.functions.*;
import rx.plugins.RxJavaPlugins;

/**
* Generates a resource, derives a Single from it and disposes that resource once the
* Single terminates.
* @param <T> the value type of the Single
* @param <Resource> the resource type
*/
public final class SingleOnSubscribeUsing<T, Resource> implements Single.OnSubscribe<T> {
final Func0<Resource> resourceFactory;
final Func1<? super Resource, ? extends Single<? extends T>> singleFactory;
final Action1<? super Resource> disposeAction;
final boolean disposeEagerly;

public SingleOnSubscribeUsing(Func0<Resource> resourceFactory,
Func1<? super Resource, ? extends Single<? extends T>> observableFactory,
Action1<? super Resource> disposeAction, boolean disposeEagerly) {
this.resourceFactory = resourceFactory;
this.singleFactory = observableFactory;
this.disposeAction = disposeAction;
this.disposeEagerly = disposeEagerly;
}

@Override
public void call(final SingleSubscriber<? super T> child) {
final Resource resource;

try {
resource = resourceFactory.call();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
child.onError(ex);
return;
}

Single<? extends T> single;

try {
single = singleFactory.call(resource);
} catch (Throwable ex) {
handleSubscriptionTimeError(child, resource, ex);
return;
}

if (single == null) {
handleSubscriptionTimeError(child, resource, new NullPointerException("The single"));
return;
}

SingleSubscriber<T> parent = new SingleSubscriber<T>() {
@Override
public void onSuccess(T value) {
if (disposeEagerly) {
try {
disposeAction.call(resource);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);

child.onError(ex);
return;
}
}

child.onSuccess(value);

if (!disposeEagerly) {
try {
disposeAction.call(resource);
} catch (Throwable ex2) {
Exceptions.throwIfFatal(ex2);
RxJavaPlugins.getInstance().getErrorHandler().handleError(ex2);
}
}
}

@Override
public void onError(Throwable error) {
handleSubscriptionTimeError(child, resource, error);
}
};
child.add(parent);

single.subscribe(parent);
}

void handleSubscriptionTimeError(SingleSubscriber<? super T> t, Resource resource, Throwable ex) {
Exceptions.throwIfFatal(ex);

if (disposeEagerly) {
try {
disposeAction.call(resource);
} catch (Throwable ex2) {
Exceptions.throwIfFatal(ex2);
ex = new CompositeException(Arrays.asList(ex, ex2));
}
}

t.onError(ex);

if (!disposeEagerly) {
try {
disposeAction.call(resource);
} catch (Throwable ex2) {
Exceptions.throwIfFatal(ex2);
RxJavaPlugins.getInstance().getErrorHandler().handleError(ex2);
}
}
}
}
53 changes: 43 additions & 10 deletions src/test/java/rx/SingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,28 @@
*/
package rx;

import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import rx.Single.OnSubscribe;
import rx.exceptions.CompositeException;
import rx.exceptions.*;
import rx.functions.*;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;
import rx.schedulers.*;
import rx.singles.BlockingSingle;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;

import static org.junit.Assert.*;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;

public class SingleTest {

@Test
Expand Down Expand Up @@ -1681,4 +1680,38 @@ public void takeUntilError_withSingle_shouldMatch() {
assertFalse(until.hasObservers());
assertFalse(ts.isUnsubscribed());
}

@Test
public void subscribeWithObserver() {
@SuppressWarnings("unchecked")
Observer<Integer> o = mock(Observer.class);

Single.just(1).subscribe(o);

verify(o).onNext(1);
verify(o).onCompleted();
verify(o, never()).onError(any(Throwable.class));
}

@Test
public void subscribeWithObserverAndGetError() {
@SuppressWarnings("unchecked")
Observer<Integer> o = mock(Observer.class);

Single.<Integer>error(new TestException()).subscribe(o);

verify(o, never()).onNext(anyInt());
verify(o, never()).onCompleted();
verify(o).onError(any(TestException.class));
}

@Test
public void subscribeWithNullObserver() {
try {
Single.just(1).subscribe((Observer<Integer>)null);
fail("Failed to throw NullPointerException");
} catch (NullPointerException ex) {
assertEquals("observer is null", ex.getMessage());
}
}
}
Loading

0 comments on commit ad73819

Please sign in to comment.