Skip to content

Commit

Permalink
Avoid swallowing errors in Completable
Browse files Browse the repository at this point in the history
Instead, deliver them up to the thread's uncaught exception handler.

Fixes #3726
  • Loading branch information
loganj committed Feb 24, 2016
1 parent a57bccc commit 7309342
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 16 deletions.
14 changes: 12 additions & 2 deletions src/main/java/rx/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1835,6 +1835,8 @@ public void onCompleted() {
public void onError(Throwable e) {
ERROR_HANDLER.handleError(e);
mad.unsubscribe();
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, e);
}

@Override
Expand Down Expand Up @@ -1864,14 +1866,19 @@ public void onCompleted() {
onComplete.call();
} catch (Throwable e) {
ERROR_HANDLER.handleError(e);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, e);
} finally {
mad.unsubscribe();
}
mad.unsubscribe();
}

@Override
public void onError(Throwable e) {
ERROR_HANDLER.handleError(e);
mad.unsubscribe();
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, e);
}

@Override
Expand Down Expand Up @@ -1915,8 +1922,11 @@ public void onError(Throwable e) {
} catch (Throwable ex) {
e = new CompositeException(Arrays.asList(e, ex));
ERROR_HANDLER.handleError(e);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, e);
} finally {
mad.unsubscribe();
}
mad.unsubscribe();
}

@Override
Expand Down
16 changes: 16 additions & 0 deletions src/test/java/rx/CapturingUncaughtExceptionHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package rx;

import java.util.concurrent.CountDownLatch;

public final class CapturingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
public int count = 0;
public Throwable caught;
public CountDownLatch completed = new CountDownLatch(1);

@Override
public void uncaughtException(Thread t, Throwable e) {
count++;
caught = e;
completed.countDown();
}
}
79 changes: 78 additions & 1 deletion src/test/java/rx/CompletableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2700,7 +2700,64 @@ public void call(CompletableSubscriber s) {

Assert.assertTrue(name.get().startsWith("RxComputation"));
}


@Test(timeout = 1000)
public void subscribeEmptyOnError() {
expectUncaughtTestException(new Action0() {
@Override public void call() {
error.completable.subscribe();
}
});
}

@Test(timeout = 1000)
public void subscribeOneActionOnError() {
expectUncaughtTestException(new Action0() {
@Override
public void call() {
error.completable.subscribe(new Action0() {
@Override
public void call() {
}
});
}
});
}

@Test(timeout = 1000)
public void subscribeOneActionThrowFromOnCompleted() {
expectUncaughtTestException(new Action0() {
@Override
public void call() {
normal.completable.subscribe(new Action0() {
@Override
public void call() {
throw new TestException();
}
});
}
});
}

@Test(timeout = 1000)
public void subscribeTwoActionsThrowFromOnError() {
expectUncaughtTestException(new Action0() {
@Override
public void call() {
error.completable.subscribe(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throw new TestException();
}
}, new Action0() {
@Override
public void call() {
}
});
}
});
}

@Test(timeout = 1000)
public void timeoutEmitError() {
Throwable e = Completable.never().timeout(100, TimeUnit.MILLISECONDS).get();
Expand Down Expand Up @@ -3742,4 +3799,24 @@ public void call(Throwable e) {
assertNotNull("Unsubscribed before the call to onError", subscriptionRef.get());
}

private static void expectUncaughtTestException(Action0 action) {
Thread.UncaughtExceptionHandler originalHandler = Thread.getDefaultUncaughtExceptionHandler();
CapturingUncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler();
Thread.setDefaultUncaughtExceptionHandler(handler);
try {
action.call();
assertEquals("Should have received exactly 1 exception", 1, handler.count);
Throwable caught = handler.caught;
while (caught != null) {
if (caught instanceof TestException) break;
if (caught == caught.getCause()) break;
caught = caught.getCause();
}
assertTrue("A TestException should have been delivered to the handler",
caught instanceof TestException);
} finally {
Thread.setDefaultUncaughtExceptionHandler(originalHandler);
}
}

}
14 changes: 1 addition & 13 deletions src/test/java/rx/schedulers/SchedulerTests.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package rx.schedulers;

import rx.CapturingUncaughtExceptionHandler;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
Expand Down Expand Up @@ -87,19 +88,6 @@ static void testHandledErrorIsNotDeliveredToThreadHandler(Scheduler scheduler) t
}
}

private static final class CapturingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
int count = 0;
Throwable caught;
CountDownLatch completed = new CountDownLatch(1);

@Override
public void uncaughtException(Thread t, Throwable e) {
count++;
caught = e;
completed.countDown();
}
}

private static final class CapturingObserver<T> implements Observer<T> {
CountDownLatch completed = new CountDownLatch(1);
int errorCount = 0;
Expand Down

0 comments on commit 7309342

Please sign in to comment.