diff --git a/rxjava-core/src/main/java/rx/operators/OperatorToObservableFuture.java b/rxjava-core/src/main/java/rx/operators/OperatorToObservableFuture.java index e263ff5023..869411bd28 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorToObservableFuture.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorToObservableFuture.java @@ -62,6 +62,10 @@ public void call() { } })); try { + //don't block or propagate CancellationException if already unsubscribed + if (subscriber.isUnsubscribed()) { + return; + } T value = (unit == null) ? (T) that.get() : (T) that.get(time, unit); subscriber.onNext(value); subscriber.onCompleted(); @@ -71,6 +75,10 @@ public void call() { // since it's already subscribed. // If the Future is canceled in other place, CancellationException will be still // passed to the final Subscriber. + if (subscriber.isUnsubscribed()) { + //refuse to emit onError if already unsubscribed + return; + } subscriber.onError(e); } } diff --git a/rxjava-core/src/test/java/rx/operators/OperatorToObservableFutureTest.java b/rxjava-core/src/test/java/rx/operators/OperatorToObservableFutureTest.java index fa5e2ec2fe..676adff915 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorToObservableFutureTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorToObservableFutureTest.java @@ -15,6 +15,7 @@ */ package rx.operators; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -22,14 +23,22 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; import rx.Observable; import rx.Observer; +import rx.Subscriber; import rx.Subscription; import rx.observers.TestObserver; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; public class OperatorToObservableFutureTest { @@ -68,4 +77,64 @@ public void testFailure() throws Exception { verify(o, times(1)).onError(e); verify(future, times(1)).cancel(true); } + + @Test + public void testCancelledBeforeSubscribe() throws Exception { + Future future = mock(Future.class); + CancellationException e = new CancellationException("unit test synthetic cancellation"); + when(future.get()).thenThrow(e); + Observer o = mock(Observer.class); + + TestSubscriber testSubscriber = new TestSubscriber(o); + testSubscriber.unsubscribe(); + Subscription sub = Observable.from(future).subscribe(testSubscriber); + assertEquals(0, testSubscriber.getOnErrorEvents().size()); + assertEquals(0, testSubscriber.getOnCompletedEvents().size()); + } + + @Test + public void testCancellationDuringFutureGet() throws Exception { + Future future = new Future() { + private AtomicBoolean isCancelled = new AtomicBoolean(false); + private AtomicBoolean isDone = new AtomicBoolean(false); + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + isCancelled.compareAndSet(false, true); + return true; + } + + @Override + public boolean isCancelled() { + return isCancelled.get(); + } + + @Override + public boolean isDone() { + return isCancelled() || isDone.get(); + } + + @Override + public Object get() throws InterruptedException, ExecutionException { + Thread.sleep(500); + isDone.compareAndSet(false, true); + return "foo"; + } + + @Override + public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return get(); + } + }; + + Observer o = mock(Observer.class); + + TestSubscriber testSubscriber = new TestSubscriber(o); + Observable futureObservable = Observable.from(future); + Subscription sub = futureObservable.subscribeOn(Schedulers.computation()).subscribe(testSubscriber); + sub.unsubscribe(); + assertEquals(0, testSubscriber.getOnErrorEvents().size()); + assertEquals(0, testSubscriber.getOnCompletedEvents().size()); + assertEquals(0, testSubscriber.getOnNextEvents().size()); + } }