diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleFromCallable.java b/src/main/java/io/reactivex/internal/operators/single/SingleFromCallable.java index 588e4bc3c1..80c265f6c9 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleFromCallable.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleFromCallable.java @@ -16,8 +16,12 @@ import java.util.concurrent.Callable; import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.Exceptions; import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.plugins.RxJavaPlugins; public final class SingleFromCallable extends Single { @@ -28,20 +32,29 @@ public SingleFromCallable(Callable callable) { } @Override - protected void subscribeActual(SingleObserver s) { + protected void subscribeActual(SingleObserver observer) { + Disposable d = Disposables.empty(); + observer.onSubscribe(d); + + if (d.isDisposed()) { + return; + } + T value; - s.onSubscribe(EmptyDisposable.INSTANCE); try { - T v = callable.call(); - if (v != null) { - s.onSuccess(v); + value = ObjectHelper.requireNonNull(callable.call(), "The callable returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + if (!d.isDisposed()) { + observer.onError(ex); } else { - s.onError(new NullPointerException("The callable returned a null value")); + RxJavaPlugins.onError(ex); } - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - s.onError(e); + return; } - } + if (!d.isDisposed()) { + observer.onSuccess(value); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableFromCallableTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableFromCallableTest.java index 64142918bc..5d68b8b59e 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableFromCallableTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableFromCallableTest.java @@ -15,10 +15,22 @@ import io.reactivex.Completable; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; + +import io.reactivex.Observable; +import io.reactivex.Observer; +import io.reactivex.TestHelper; +import io.reactivex.disposables.Disposable; +import io.reactivex.observers.TestObserver; +import io.reactivex.schedulers.Schedulers; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; public class CompletableFromCallableTest { @Test(expected = NullPointerException.class) @@ -100,4 +112,57 @@ public Object call() throws Exception { .test() .assertFailure(UnsupportedOperationException.class); } + + @SuppressWarnings("unchecked") + @Test + public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Exception { + Callable func = mock(Callable.class); + + final CountDownLatch funcLatch = new CountDownLatch(1); + final CountDownLatch observerLatch = new CountDownLatch(1); + + when(func.call()).thenAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + observerLatch.countDown(); + + try { + funcLatch.await(); + } catch (InterruptedException e) { + // It's okay, unsubscription causes Thread interruption + + // Restoring interruption status of the Thread + Thread.currentThread().interrupt(); + } + + return "should_not_be_delivered"; + } + }); + + Completable fromCallableObservable = Completable.fromCallable(func); + + Observer observer = TestHelper.mockObserver(); + + TestObserver outer = new TestObserver(observer); + + fromCallableObservable + .subscribeOn(Schedulers.computation()) + .subscribe(outer); + + // Wait until func will be invoked + observerLatch.await(); + + // Unsubscribing before emission + outer.cancel(); + + // Emitting result + funcLatch.countDown(); + + // func must be invoked + verify(func).call(); + + // Observer must not be notified at all + verify(observer).onSubscribe(any(Disposable.class)); + verifyNoMoreInteractions(observer); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromCallableTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromCallableTest.java index 0b8d1df77d..7a56347a32 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromCallableTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromCallableTest.java @@ -14,17 +14,22 @@ package io.reactivex.internal.operators.maybe; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import io.reactivex.disposables.Disposable; import org.junit.Test; import io.reactivex.*; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class MaybeFromCallableTest { @Test(expected = NullPointerException.class) @@ -158,4 +163,57 @@ public Integer call() throws Exception { RxJavaPlugins.reset(); } } + + @SuppressWarnings("unchecked") + @Test + public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Exception { + Callable func = mock(Callable.class); + + final CountDownLatch funcLatch = new CountDownLatch(1); + final CountDownLatch observerLatch = new CountDownLatch(1); + + when(func.call()).thenAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + observerLatch.countDown(); + + try { + funcLatch.await(); + } catch (InterruptedException e) { + // It's okay, unsubscription causes Thread interruption + + // Restoring interruption status of the Thread + Thread.currentThread().interrupt(); + } + + return "should_not_be_delivered"; + } + }); + + Maybe fromCallableObservable = Maybe.fromCallable(func); + + Observer observer = TestHelper.mockObserver(); + + TestObserver outer = new TestObserver(observer); + + fromCallableObservable + .subscribeOn(Schedulers.computation()) + .subscribe(outer); + + // Wait until func will be invoked + observerLatch.await(); + + // Unsubscribing before emission + outer.cancel(); + + // Emitting result + funcLatch.countDown(); + + // func must be invoked + verify(func).call(); + + // Observer must not be notified at all + verify(observer).onSubscribe(any(Disposable.class)); + verifyNoMoreInteractions(observer); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleFromCallableTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleFromCallableTest.java index 6c753fafbf..ebd0445358 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleFromCallableTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleFromCallableTest.java @@ -13,11 +13,31 @@ package io.reactivex.internal.operators.single; +import io.reactivex.Observer; import io.reactivex.Single; -import java.util.concurrent.Callable; +import io.reactivex.SingleObserver; +import io.reactivex.TestHelper; +import io.reactivex.disposables.Disposable; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; public class SingleFromCallableTest { + @Test public void fromCallableValue() { Single.fromCallable(new Callable() { @@ -50,4 +70,210 @@ public void fromCallableNull() { .test() .assertFailureAndMessage(NullPointerException.class, "The callable returned a null value"); } + + @Test + public void fromCallableTwice() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Callable callable = new Callable() { + @Override + public Integer call() throws Exception { + return atomicInteger.incrementAndGet(); + } + }; + + Single.fromCallable(callable) + .test() + .assertResult(1); + + assertEquals(1, atomicInteger.get()); + + Single.fromCallable(callable) + .test() + .assertResult(2); + + assertEquals(2, atomicInteger.get()); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldNotInvokeFuncUntilSubscription() throws Exception { + Callable func = mock(Callable.class); + + when(func.call()).thenReturn(new Object()); + + Single fromCallableSingle = Single.fromCallable(func); + + verifyZeroInteractions(func); + + fromCallableSingle.subscribe(); + + verify(func).call(); + } + + + @Test + public void noErrorLoss() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + final CountDownLatch cdl1 = new CountDownLatch(1); + final CountDownLatch cdl2 = new CountDownLatch(1); + + TestObserver to = Single.fromCallable(new Callable() { + @Override + public Integer call() throws Exception { + cdl1.countDown(); + cdl2.await(5, TimeUnit.SECONDS); + return 1; + } + }).subscribeOn(Schedulers.single()).test(); + + assertTrue(cdl1.await(5, TimeUnit.SECONDS)); + + to.cancel(); + + int timeout = 10; + + while (timeout-- > 0 && errors.isEmpty()) { + Thread.sleep(100); + } + + TestHelper.assertUndeliverable(errors, 0, InterruptedException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @SuppressWarnings("unchecked") + @Test + public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Exception { + Callable func = mock(Callable.class); + + final CountDownLatch funcLatch = new CountDownLatch(1); + final CountDownLatch observerLatch = new CountDownLatch(1); + + when(func.call()).thenAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + observerLatch.countDown(); + + try { + funcLatch.await(); + } catch (InterruptedException e) { + // It's okay, unsubscription causes Thread interruption + + // Restoring interruption status of the Thread + Thread.currentThread().interrupt(); + } + + return "should_not_be_delivered"; + } + }); + + Single fromCallableObservable = Single.fromCallable(func); + + Observer observer = TestHelper.mockObserver(); + + TestObserver outer = new TestObserver(observer); + + fromCallableObservable + .subscribeOn(Schedulers.computation()) + .subscribe(outer); + + // Wait until func will be invoked + observerLatch.await(); + + // Unsubscribing before emission + outer.cancel(); + + // Emitting result + funcLatch.countDown(); + + // func must be invoked + verify(func).call(); + + // Observer must not be notified at all + verify(observer).onSubscribe(any(Disposable.class)); + verifyNoMoreInteractions(observer); + } + + @Test + public void shouldAllowToThrowCheckedException() { + final Exception checkedException = new Exception("test exception"); + + Single fromCallableObservable = Single.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + throw checkedException; + } + }); + + SingleObserver observer = TestHelper.mockSingleObserver(); + + fromCallableObservable.subscribe(observer); + + verify(observer).onSubscribe(any(Disposable.class)); + verify(observer).onError(checkedException); + verifyNoMoreInteractions(observer); + } + + @Test + public void disposedOnArrival() { + final int[] count = { 0 }; + Single.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + count[0]++; + return 1; + } + }) + .test(true) + .assertEmpty(); + + assertEquals(0, count[0]); + } + + @Test + public void disposedOnCall() { + final TestObserver to = new TestObserver(); + + Single.fromCallable(new Callable() { + @Override + public Integer call() throws Exception { + to.cancel(); + return 1; + } + }) + .subscribe(to); + + to.assertEmpty(); + } + + @Test + public void toObservableTake() { + Single.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }) + .toObservable() + .take(1) + .test() + .assertResult(1); + } + + @Test + public void toObservableAndBack() { + Single.fromCallable(new Callable() { + @Override + public Integer call() throws Exception { + return 1; + } + }) + .toObservable() + .singleOrError() + .test() + .assertResult(1); + } }