diff --git a/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java b/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java index 2f9a595f6d..20421072e5 100644 --- a/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java @@ -49,7 +49,7 @@ public Worker createWorker() { @NonNull @Override public Disposable scheduleDirect(@NonNull Runnable run) { - run.run(); + RxJavaPlugins.onSchedule(run).run(); return EmptyDisposable.INSTANCE; } @@ -58,7 +58,7 @@ public Disposable scheduleDirect(@NonNull Runnable run) { public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) { try { unit.sleep(delay); - run.run(); + RxJavaPlugins.onSchedule(run).run(); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); RxJavaPlugins.onError(ex); diff --git a/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java b/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java index 74c2ecdb0d..b81036a7e0 100644 --- a/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java +++ b/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java @@ -20,6 +20,8 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; import org.junit.Test; import org.mockito.InOrder; import org.mockito.invocation.InvocationOnMock; @@ -41,6 +43,7 @@ public abstract class AbstractSchedulerTests { /** * The scheduler to test. + * * @return the Scheduler instance */ protected abstract Scheduler getScheduler(); @@ -576,6 +579,7 @@ public void schedulePeriodicallyDirectZeroPeriod() throws Exception { try { sd.replace(s.schedulePeriodicallyDirect(new Runnable() { int count; + @Override public void run() { if (++count == 10) { @@ -610,6 +614,7 @@ public void schedulePeriodicallyZeroPeriod() throws Exception { try { sd.replace(w.schedulePeriodically(new Runnable() { int count; + @Override public void run() { if (++count == 10) { @@ -626,4 +631,71 @@ public void run() { } } } + + private void assertRunnableDecorated(Runnable scheduleCall) throws InterruptedException { + try { + final CountDownLatch decoratedCalled = new CountDownLatch(1); + + RxJavaPlugins.setScheduleHandler(new Function() { + @Override + public Runnable apply(final Runnable actual) throws Exception { + return new Runnable() { + @Override + public void run() { + decoratedCalled.countDown(); + actual.run(); + } + }; + } + }); + + scheduleCall.run(); + + assertTrue(decoratedCalled.await(5, TimeUnit.SECONDS)); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test(timeout = 6000) + public void scheduleDirectDecoratesRunnable() throws InterruptedException { + assertRunnableDecorated(new Runnable() { + @Override + public void run() { + getScheduler().scheduleDirect(Functions.EMPTY_RUNNABLE); + } + }); + } + + @Test(timeout = 6000) + public void scheduleDirectWithDelayDecoratesRunnable() throws InterruptedException { + assertRunnableDecorated(new Runnable() { + @Override + public void run() { + getScheduler().scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MILLISECONDS); + } + }); + } + + @Test(timeout = 6000) + public void schedulePeriodicallyDirectDecoratesRunnable() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + if (scheduler instanceof TrampolineScheduler) { + // Can't properly stop a trampolined periodic task. + return; + } + + final AtomicReference disposable = new AtomicReference(); + + try { + assertRunnableDecorated(new Runnable() { + @Override + public void run() { + disposable.set(scheduler.schedulePeriodicallyDirect(Functions.EMPTY_RUNNABLE, 1, 10000, TimeUnit.MILLISECONDS)); + } + }); + } finally { + disposable.get().dispose(); + } + } }