Skip to content

Commit

Permalink
2.x: Fix TrampolineScheduler not calling RxJavaPlugins.onSchedule(), …
Browse files Browse the repository at this point in the history
…add tests for all schedulers.
  • Loading branch information
artem-zinnatullin committed Nov 30, 2017
1 parent 860e39e commit 600748c
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Worker createWorker() {
@NonNull
@Override
public Disposable scheduleDirect(@NonNull Runnable run) {
run.run();
RxJavaPlugins.onSchedule(run).run();
return EmptyDisposable.INSTANCE;
}

Expand All @@ -58,7 +58,7 @@ public Disposable scheduleDirect(@NonNull Runnable run) {
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
try {
unit.sleep(delay);
run.run();
RxJavaPlugins.onSchedule(run).run();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
RxJavaPlugins.onError(ex);
Expand Down
72 changes: 72 additions & 0 deletions src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import io.reactivex.internal.functions.Functions;
import io.reactivex.plugins.RxJavaPlugins;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.invocation.InvocationOnMock;
Expand All @@ -41,6 +43,7 @@ public abstract class AbstractSchedulerTests {

/**
* The scheduler to test.
*
* @return the Scheduler instance
*/
protected abstract Scheduler getScheduler();
Expand Down Expand Up @@ -576,6 +579,7 @@ public void schedulePeriodicallyDirectZeroPeriod() throws Exception {
try {
sd.replace(s.schedulePeriodicallyDirect(new Runnable() {
int count;

@Override
public void run() {
if (++count == 10) {
Expand Down Expand Up @@ -610,6 +614,7 @@ public void schedulePeriodicallyZeroPeriod() throws Exception {
try {
sd.replace(w.schedulePeriodically(new Runnable() {
int count;

@Override
public void run() {
if (++count == 10) {
Expand All @@ -626,4 +631,71 @@ public void run() {
}
}
}

private void assertRunnableDecorated(Runnable scheduleCall) throws InterruptedException {
try {
final CountDownLatch decoratedCalled = new CountDownLatch(1);

RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
@Override
public Runnable apply(final Runnable actual) throws Exception {
return new Runnable() {
@Override
public void run() {
decoratedCalled.countDown();
actual.run();
}
};
}
});

scheduleCall.run();

assertTrue(decoratedCalled.await(5, TimeUnit.SECONDS));
} finally {
RxJavaPlugins.reset();
}
}

@Test(timeout = 6000)
public void scheduleDirectDecoratesRunnable() throws InterruptedException {
assertRunnableDecorated(new Runnable() {
@Override
public void run() {
getScheduler().scheduleDirect(Functions.EMPTY_RUNNABLE);
}
});
}

@Test(timeout = 6000)
public void scheduleDirectWithDelayDecoratesRunnable() throws InterruptedException {
assertRunnableDecorated(new Runnable() {
@Override
public void run() {
getScheduler().scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MILLISECONDS);
}
});
}

@Test(timeout = 6000)
public void schedulePeriodicallyDirectDecoratesRunnable() throws InterruptedException {
final Scheduler scheduler = getScheduler();
if (scheduler instanceof TrampolineScheduler) {
// Can't properly stop a trampolined periodic task.
return;
}

final AtomicReference<Disposable> disposable = new AtomicReference<Disposable>();

try {
assertRunnableDecorated(new Runnable() {
@Override
public void run() {
disposable.set(scheduler.schedulePeriodicallyDirect(Functions.EMPTY_RUNNABLE, 1, 10000, TimeUnit.MILLISECONDS));
}
});
} finally {
disposable.get().dispose();
}
}
}

0 comments on commit 600748c

Please sign in to comment.