Skip to content

Commit

Permalink
Check isDisposed before emitting in SingleFromCallable (#5743)
Browse files Browse the repository at this point in the history
Previously SingleFromCallable did not check if the subscriber was
unsubscribed before emitting onSuccess or onError. This fixes that
behavior and adds tests to SingleFromCallable, CompletableFromCallable,
and MaybeFromCallable.

Fixes #5742
  • Loading branch information
runningcode authored and akarnokd committed Nov 25, 2017
1 parent e25be7c commit 9564121
Show file tree
Hide file tree
Showing 4 changed files with 373 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
import java.util.concurrent.Callable;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.internal.disposables.EmptyDisposable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class SingleFromCallable<T> extends Single<T> {

Expand All @@ -28,20 +32,29 @@ public SingleFromCallable(Callable<? extends T> callable) {
}

@Override
protected void subscribeActual(SingleObserver<? super T> s) {
protected void subscribeActual(SingleObserver<? super T> observer) {
Disposable d = Disposables.empty();
observer.onSubscribe(d);

if (d.isDisposed()) {
return;
}
T value;

s.onSubscribe(EmptyDisposable.INSTANCE);
try {
T v = callable.call();
if (v != null) {
s.onSuccess(v);
value = ObjectHelper.requireNonNull(callable.call(), "The callable returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
if (!d.isDisposed()) {
observer.onError(ex);
} else {
s.onError(new NullPointerException("The callable returned a null value"));
RxJavaPlugins.onError(ex);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.onError(e);
return;
}
}

if (!d.isDisposed()) {
observer.onSuccess(value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,22 @@

import io.reactivex.Completable;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.TestHelper;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

public class CompletableFromCallableTest {
@Test(expected = NullPointerException.class)
Expand Down Expand Up @@ -100,4 +112,57 @@ public Object call() throws Exception {
.test()
.assertFailure(UnsupportedOperationException.class);
}

@SuppressWarnings("unchecked")
@Test
public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Exception {
Callable<String> func = mock(Callable.class);

final CountDownLatch funcLatch = new CountDownLatch(1);
final CountDownLatch observerLatch = new CountDownLatch(1);

when(func.call()).thenAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
observerLatch.countDown();

try {
funcLatch.await();
} catch (InterruptedException e) {
// It's okay, unsubscription causes Thread interruption

// Restoring interruption status of the Thread
Thread.currentThread().interrupt();
}

return "should_not_be_delivered";
}
});

Completable fromCallableObservable = Completable.fromCallable(func);

Observer<Object> observer = TestHelper.mockObserver();

TestObserver<String> outer = new TestObserver<String>(observer);

fromCallableObservable
.subscribeOn(Schedulers.computation())
.subscribe(outer);

// Wait until func will be invoked
observerLatch.await();

// Unsubscribing before emission
outer.cancel();

// Emitting result
funcLatch.countDown();

// func must be invoked
verify(func).call();

// Observer must not be notified at all
verify(observer).onSubscribe(any(Disposable.class));
verifyNoMoreInteractions(observer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@
package io.reactivex.internal.operators.maybe;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import io.reactivex.disposables.Disposable;
import org.junit.Test;

import io.reactivex.*;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class MaybeFromCallableTest {
@Test(expected = NullPointerException.class)
Expand Down Expand Up @@ -158,4 +163,57 @@ public Integer call() throws Exception {
RxJavaPlugins.reset();
}
}

@SuppressWarnings("unchecked")
@Test
public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Exception {
Callable<String> func = mock(Callable.class);

final CountDownLatch funcLatch = new CountDownLatch(1);
final CountDownLatch observerLatch = new CountDownLatch(1);

when(func.call()).thenAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
observerLatch.countDown();

try {
funcLatch.await();
} catch (InterruptedException e) {
// It's okay, unsubscription causes Thread interruption

// Restoring interruption status of the Thread
Thread.currentThread().interrupt();
}

return "should_not_be_delivered";
}
});

Maybe<String> fromCallableObservable = Maybe.fromCallable(func);

Observer<Object> observer = TestHelper.mockObserver();

TestObserver<String> outer = new TestObserver<String>(observer);

fromCallableObservable
.subscribeOn(Schedulers.computation())
.subscribe(outer);

// Wait until func will be invoked
observerLatch.await();

// Unsubscribing before emission
outer.cancel();

// Emitting result
funcLatch.countDown();

// func must be invoked
verify(func).call();

// Observer must not be notified at all
verify(observer).onSubscribe(any(Disposable.class));
verifyNoMoreInteractions(observer);
}
}
Loading

0 comments on commit 9564121

Please sign in to comment.