Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Observable.fromCallable() as a companion for Observable.defer() #3154

Merged
merged 1 commit into from
Sep 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,29 @@ public final static <T> Observable<T> from(T[] array) {
return from(Arrays.asList(array));
}

/**
* Returns an Observable that invokes passed function and emits its result for each new Observer that subscribes.
* <p>
* Allows you to defer execution of passed function until Observer subscribes to the Observable.
* It makes passed function "lazy".
* Result of the function invocation will be emitted by the Observable.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromCallable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param func
* function which execution should be deferred, it will be invoked when Observer will subscribe to the Observable
* @param <T>
* the type of the item emitted by the Observable
* @return an Observable whose {@link Observer}s' subscriptions trigger an invocation of the given function
* @see #defer(Func0)
*/
@Experimental
public static <T> Observable<T> fromCallable(Callable<? extends T> func) {
return create(new OnSubscribeFromCallable<T>(func));
}

/**
* Returns an Observable that emits a sequential number every specified interval of time.
* <p>
Expand Down
38 changes: 38 additions & 0 deletions src/main/java/rx/internal/operators/OnSubscribeFromCallable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package rx.internal.operators;

import rx.Observable;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.internal.producers.SingleDelayedProducer;

import java.util.concurrent.Callable;

/**
* Do not invoke the function until an Observer subscribes; Invokes function on each
* subscription.
* <p>
* Pass {@code fromCallable} a function, and {@code fromCallable} will call this function to emit result of invocation
* afresh each time a new Observer subscribes.
*/
public final class OnSubscribeFromCallable<T> implements Observable.OnSubscribe<T> {

private final Callable<? extends T> resultFactory;

public OnSubscribeFromCallable(Callable<? extends T> resultFactory) {
this.resultFactory = resultFactory;
}

@Override
public void call(Subscriber<? super T> subscriber) {
final SingleDelayedProducer<T> singleDelayedProducer = new SingleDelayedProducer<T>(subscriber);

subscriber.setProducer(singleDelayedProducer);

try {
singleDelayedProducer.setValue(resultFactory.call());
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
subscriber.onError(t);
}
}
}
140 changes: 140 additions & 0 deletions src/test/java/rx/internal/operators/OnSubscribeFromCallableTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package rx.internal.operators;

import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import rx.Observable;
import rx.Observer;
import rx.Subscription;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;

import static org.mockito.Mockito.*;
import static rx.schedulers.Schedulers.computation;

public class OnSubscribeFromCallableTest {

@SuppressWarnings("unchecked")
@Test
public void shouldNotInvokeFuncUntilSubscription() throws Exception {
Callable<Object> func = mock(Callable.class);

when(func.call()).thenReturn(new Object());

Observable<Object> fromCallableObservable = Observable.fromCallable(func);

verifyZeroInteractions(func);

fromCallableObservable.subscribe();

verify(func).call();
}

@SuppressWarnings("unchecked")
@Test
public void shouldCallOnNextAndOnCompleted() throws Exception {
Callable<String> func = mock(Callable.class);

when(func.call()).thenReturn("test_value");

Observable<String> fromCallableObservable = Observable.fromCallable(func);

Observer<String> observer = mock(Observer.class);

fromCallableObservable.subscribe(observer);

verify(observer).onNext("test_value");
verify(observer).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}

@SuppressWarnings("unchecked")
@Test
public void shouldCallOnError() throws Exception {
Callable<Object> func = mock(Callable.class);

Throwable throwable = new IllegalStateException("Test exception");
when(func.call()).thenThrow(throwable);

Observable<Object> fromCallableObservable = Observable.fromCallable(func);

Observer<Object> observer = mock(Observer.class);

fromCallableObservable.subscribe(observer);

verify(observer, never()).onNext(anyObject());
verify(observer, never()).onCompleted();
verify(observer).onError(throwable);
}

@SuppressWarnings("unchecked")
@Test
public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Exception {
Callable<String> func = mock(Callable.class);

final CountDownLatch funcLatch = new CountDownLatch(1);
final CountDownLatch observerLatch = new CountDownLatch(1);

when(func.call()).thenAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
observerLatch.countDown();

try {
funcLatch.await();
} catch (InterruptedException e) {
// It's okay, unsubscription causes Thread interruption

// Restoring interruption status of the Thread
Thread.currentThread().interrupt();
}

return "should_not_be_delivered";
}
});

Observable<String> fromCallableObservable = Observable.fromCallable(func);

Observer<String> observer = mock(Observer.class);

Subscription subscription = fromCallableObservable
.subscribeOn(computation())
.subscribe(observer);

// Wait until func will be invoked
observerLatch.await();

// Unsubscribing before emission
subscription.unsubscribe();

// Emitting result
funcLatch.countDown();

// func must be invoked
verify(func).call();

// Observer must not be notified at all
verifyZeroInteractions(observer);
}

@SuppressWarnings("unchecked")
@Test
public void shouldAllowToThrowCheckedException() {
final Exception checkedException = new Exception("test exception");

Observable<Object> fromCallableObservable = Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
throw checkedException;
}
});

Observer<Object> observer = mock(Observer.class);

fromCallableObservable.subscribe(observer);

verify(observer).onError(checkedException);
verifyNoMoreInteractions(observer);
}
}