Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check unsubscribe within observable from future #1291

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public void call() {
}
}));
try {
//don't block or propagate CancellationException if already unsubscribed
if (subscriber.isUnsubscribed()) {
return;
}
T value = (unit == null) ? (T) that.get() : (T) that.get(time, unit);
subscriber.onNext(value);
subscriber.onCompleted();
Expand All @@ -71,6 +75,10 @@ public void call() {
// since it's already subscribed.
// If the Future is canceled in other place, CancellationException will be still
// passed to the final Subscriber.
if (subscriber.isUnsubscribed()) {
//refuse to emit onError if already unsubscribed
return;
}
subscriber.onError(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,30 @@
*/
package rx.operators;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.observers.TestObserver;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

public class OnSubscribeToObservableFutureTest {

Expand Down Expand Up @@ -68,4 +77,64 @@ public void testFailure() throws Exception {
verify(o, times(1)).onError(e);
verify(future, times(1)).cancel(true);
}

@Test
public void testCancelledBeforeSubscribe() throws Exception {
Future<Object> future = mock(Future.class);
CancellationException e = new CancellationException("unit test synthetic cancellation");
when(future.get()).thenThrow(e);
Observer<Object> o = mock(Observer.class);

TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>(o);
testSubscriber.unsubscribe();
Subscription sub = Observable.from(future).subscribe(testSubscriber);
assertEquals(0, testSubscriber.getOnErrorEvents().size());
assertEquals(0, testSubscriber.getOnCompletedEvents().size());
}

@Test
public void testCancellationDuringFutureGet() throws Exception {
Future<Object> future = new Future<Object>() {
private AtomicBoolean isCancelled = new AtomicBoolean(false);
private AtomicBoolean isDone = new AtomicBoolean(false);

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
isCancelled.compareAndSet(false, true);
return true;
}

@Override
public boolean isCancelled() {
return isCancelled.get();
}

@Override
public boolean isDone() {
return isCancelled() || isDone.get();
}

@Override
public Object get() throws InterruptedException, ExecutionException {
Thread.sleep(500);
isDone.compareAndSet(false, true);
return "foo";
}

@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return get();
}
};

Observer<Object> o = mock(Observer.class);

TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>(o);
Observable<Object> futureObservable = Observable.from(future);
Subscription sub = futureObservable.subscribeOn(Schedulers.computation()).subscribe(testSubscriber);
sub.unsubscribe();
assertEquals(0, testSubscriber.getOnErrorEvents().size());
assertEquals(0, testSubscriber.getOnCompletedEvents().size());
assertEquals(0, testSubscriber.getOnNextEvents().size());
}
}