diff --git a/src/main/java/io/reactivex/disposables/FutureDisposable.java b/src/main/java/io/reactivex/disposables/FutureDisposable.java index a2e633f9ec..0b3a67722a 100644 --- a/src/main/java/io/reactivex/disposables/FutureDisposable.java +++ b/src/main/java/io/reactivex/disposables/FutureDisposable.java @@ -13,11 +13,12 @@ package io.reactivex.disposables; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; /** * A Disposable container that cancels a Future instance. */ -final class FutureDisposable extends ReferenceDisposable> { +final class FutureDisposable extends AtomicReference> implements Disposable { private static final long serialVersionUID = 6545242830671168775L; @@ -29,7 +30,16 @@ final class FutureDisposable extends ReferenceDisposable> { } @Override - protected void onDisposed(Future value) { - value.cancel(allowInterrupt); + public boolean isDisposed() { + Future f = get(); + return f == null || f.isDone(); + } + + @Override + public void dispose() { + Future f = getAndSet(null); + if (f != null) { + f.cancel(allowInterrupt); + } } } diff --git a/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java index 2359f3bc89..3916073a40 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java @@ -61,7 +61,7 @@ public Disposable scheduleDirect(Runnable run) { } @Override - public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { + public Disposable scheduleDirect(Runnable run, final long delay, final TimeUnit unit) { final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); if (executor instanceof ScheduledExecutorService) { try { @@ -72,20 +72,19 @@ public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { return EmptyDisposable.INSTANCE; } } - SequentialDisposable first = new SequentialDisposable(); - final SequentialDisposable mar = new SequentialDisposable(first); + final DelayedRunnable dr = new DelayedRunnable(decoratedRun); Disposable delayed = HELPER.scheduleDirect(new Runnable() { @Override public void run() { - mar.replace(scheduleDirect(decoratedRun)); + dr.direct.replace(scheduleDirect(dr)); } }, delay, unit); - first.replace(delayed); + dr.timed.replace(delayed); - return mar; + return dr; } @Override @@ -253,7 +252,11 @@ public void run() { if (get()) { return; } - actual.run(); + try { + actual.run(); + } finally { + lazySet(true); + } } @Override @@ -266,6 +269,49 @@ public boolean isDisposed() { return get(); } } + + } + + static final class DelayedRunnable extends AtomicReference implements Runnable, Disposable { + + private static final long serialVersionUID = -4101336210206799084L; + + final SequentialDisposable timed; + + final SequentialDisposable direct; + + DelayedRunnable(Runnable run) { + super(run); + this.timed = new SequentialDisposable(); + this.direct = new SequentialDisposable(); + } + + @Override + public void run() { + Runnable r = get(); + if (r != null) { + try { + r.run(); + } finally { + lazySet(null); + timed.lazySet(DisposableHelper.DISPOSED); + direct.lazySet(DisposableHelper.DISPOSED); + } + } + } + + @Override + public boolean isDisposed() { + return get() == null; + } + + @Override + public void dispose() { + if (getAndSet(null) != null) { + timed.dispose(); + direct.dispose(); + } + } } } diff --git a/src/test/java/io/reactivex/internal/schedulers/SingleSchedulerTest.java b/src/test/java/io/reactivex/internal/schedulers/SingleSchedulerTest.java index 8beb2664c8..929e626050 100644 --- a/src/test/java/io/reactivex/internal/schedulers/SingleSchedulerTest.java +++ b/src/test/java/io/reactivex/internal/schedulers/SingleSchedulerTest.java @@ -15,14 +15,16 @@ import static org.junit.Assert.*; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import org.junit.Test; import io.reactivex.*; import io.reactivex.Scheduler.Worker; -import io.reactivex.disposables.Disposables; +import io.reactivex.disposables.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.schedulers.SingleScheduler.ScheduledWorker; +import io.reactivex.schedulers.Schedulers; public class SingleSchedulerTest { @@ -78,4 +80,41 @@ public void run() { TestHelper.race(r1, r1); } } + + @Test(timeout = 1000) + public void runnableDisposedAsync() throws Exception { + final Scheduler s = Schedulers.single(); + Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE); + + while (!d.isDisposed()) { + Thread.sleep(1); + } + } + + @Test(timeout = 1000) + public void runnableDisposedAsyncCrash() throws Exception { + final Scheduler s = Schedulers.single(); + + Disposable d = s.scheduleDirect(new Runnable() { + @Override + public void run() { + throw new IllegalStateException(); + } + }); + + while (!d.isDisposed()) { + Thread.sleep(1); + } + } + + @Test(timeout = 1000) + public void runnableDisposedAsyncTimed() throws Exception { + final Scheduler s = Schedulers.single(); + + Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MILLISECONDS); + + while (!d.isDisposed()) { + Thread.sleep(1); + } + } } diff --git a/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java b/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java index 6a0888954e..a6af466daa 100644 --- a/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java @@ -484,10 +484,81 @@ public void execute(Runnable r) { }); Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE); - assertFalse(d.isDisposed()); + assertTrue(d.isDisposed()); + } - d.dispose(); + @Test(timeout = 1000) + public void runnableDisposedAsync() throws Exception { + final Scheduler s = Schedulers.from(new Executor() { + @Override + public void execute(Runnable r) { + new Thread(r).start(); + } + }); + Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE); - assertTrue(d.isDisposed()); + while (!d.isDisposed()) { + Thread.sleep(1); + } + } + + @Test(timeout = 1000) + public void runnableDisposedAsync2() throws Exception { + final Scheduler s = Schedulers.from(executor); + Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE); + + while (!d.isDisposed()) { + Thread.sleep(1); + } + } + + @Test(timeout = 1000) + public void runnableDisposedAsyncCrash() throws Exception { + final Scheduler s = Schedulers.from(new Executor() { + @Override + public void execute(Runnable r) { + new Thread(r).start(); + } + }); + Disposable d = s.scheduleDirect(new Runnable() { + @Override + public void run() { + throw new IllegalStateException(); + } + }); + + while (!d.isDisposed()) { + Thread.sleep(1); + } + } + + @Test(timeout = 1000) + public void runnableDisposedAsyncTimed() throws Exception { + final Scheduler s = Schedulers.from(new Executor() { + @Override + public void execute(Runnable r) { + new Thread(r).start(); + } + }); + Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MILLISECONDS); + + while (!d.isDisposed()) { + Thread.sleep(1); + } + } + + @Test(timeout = 1000) + public void runnableDisposedAsyncTimed2() throws Exception { + ExecutorService executorScheduler = Executors.newScheduledThreadPool(1, new RxThreadFactory("TestCustomPoolTimed")); + try { + final Scheduler s = Schedulers.from(executorScheduler); + Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MILLISECONDS); + + while (!d.isDisposed()) { + Thread.sleep(1); + } + } finally { + executorScheduler.shutdownNow(); + } } }