Skip to content

Commit

Permalink
2.x: ExecutorScheduler.scheduleDirect to report isDisposed on complete
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Jan 18, 2017
1 parent 9c34eb1 commit dd0c96d
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 15 deletions.
16 changes: 13 additions & 3 deletions src/main/java/io/reactivex/disposables/FutureDisposable.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
package io.reactivex.disposables;

import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;

/**
* A Disposable container that cancels a Future instance.
*/
final class FutureDisposable extends ReferenceDisposable<Future<?>> {
final class FutureDisposable extends AtomicReference<Future<?>> implements Disposable {

private static final long serialVersionUID = 6545242830671168775L;

Expand All @@ -29,7 +30,16 @@ final class FutureDisposable extends ReferenceDisposable<Future<?>> {
}

@Override
protected void onDisposed(Future<?> value) {
value.cancel(allowInterrupt);
public boolean isDisposed() {
Future<?> f = get();
return f == null || f.isDone();
}

@Override
public void dispose() {
Future<?> f = getAndSet(null);
if (f != null) {
f.cancel(allowInterrupt);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Disposable scheduleDirect(Runnable run) {
}

@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
public Disposable scheduleDirect(Runnable run, final long delay, final TimeUnit unit) {
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
if (executor instanceof ScheduledExecutorService) {
try {
Expand All @@ -72,20 +72,19 @@ public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
return EmptyDisposable.INSTANCE;
}
}
SequentialDisposable first = new SequentialDisposable();

final SequentialDisposable mar = new SequentialDisposable(first);
final DelayedRunnable dr = new DelayedRunnable(decoratedRun);

Disposable delayed = HELPER.scheduleDirect(new Runnable() {
@Override
public void run() {
mar.replace(scheduleDirect(decoratedRun));
dr.direct.replace(scheduleDirect(dr));
}
}, delay, unit);

first.replace(delayed);
dr.timed.replace(delayed);

return mar;
return dr;
}

@Override
Expand Down Expand Up @@ -253,7 +252,11 @@ public void run() {
if (get()) {
return;
}
actual.run();
try {
actual.run();
} finally {
lazySet(true);
}
}

@Override
Expand All @@ -266,6 +269,49 @@ public boolean isDisposed() {
return get();
}
}

}

static final class DelayedRunnable extends AtomicReference<Runnable> implements Runnable, Disposable {

private static final long serialVersionUID = -4101336210206799084L;

final SequentialDisposable timed;

final SequentialDisposable direct;

DelayedRunnable(Runnable run) {
super(run);
this.timed = new SequentialDisposable();
this.direct = new SequentialDisposable();
}

@Override
public void run() {
Runnable r = get();
if (r != null) {
try {
r.run();
} finally {
lazySet(null);
timed.lazySet(DisposableHelper.DISPOSED);
direct.lazySet(DisposableHelper.DISPOSED);
}
}
}

@Override
public boolean isDisposed() {
return get() == null;
}

@Override
public void dispose() {
if (getAndSet(null) != null) {
timed.dispose();
direct.dispose();
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@

import static org.junit.Assert.*;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

import org.junit.Test;

import io.reactivex.*;
import io.reactivex.Scheduler.Worker;
import io.reactivex.disposables.Disposables;
import io.reactivex.disposables.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.schedulers.SingleScheduler.ScheduledWorker;
import io.reactivex.schedulers.Schedulers;

public class SingleSchedulerTest {

Expand Down Expand Up @@ -78,4 +80,41 @@ public void run() {
TestHelper.race(r1, r1);
}
}

@Test(timeout = 1000)
public void runnableDisposedAsync() throws Exception {
final Scheduler s = Schedulers.single();
Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE);

while (!d.isDisposed()) {
Thread.sleep(1);
}
}

@Test(timeout = 1000)
public void runnableDisposedAsyncCrash() throws Exception {
final Scheduler s = Schedulers.single();

Disposable d = s.scheduleDirect(new Runnable() {
@Override
public void run() {
throw new IllegalStateException();
}
});

while (!d.isDisposed()) {
Thread.sleep(1);
}
}

@Test(timeout = 1000)
public void runnableDisposedAsyncTimed() throws Exception {
final Scheduler s = Schedulers.single();

Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MILLISECONDS);

while (!d.isDisposed()) {
Thread.sleep(1);
}
}
}
77 changes: 74 additions & 3 deletions src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -484,10 +484,81 @@ public void execute(Runnable r) {
});
Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE);

assertFalse(d.isDisposed());
assertTrue(d.isDisposed());
}

d.dispose();
@Test(timeout = 1000)
public void runnableDisposedAsync() throws Exception {
final Scheduler s = Schedulers.from(new Executor() {
@Override
public void execute(Runnable r) {
new Thread(r).start();
}
});
Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE);

assertTrue(d.isDisposed());
while (!d.isDisposed()) {
Thread.sleep(1);
}
}

@Test(timeout = 1000)
public void runnableDisposedAsync2() throws Exception {
final Scheduler s = Schedulers.from(executor);
Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE);

while (!d.isDisposed()) {
Thread.sleep(1);
}
}

@Test(timeout = 1000)
public void runnableDisposedAsyncCrash() throws Exception {
final Scheduler s = Schedulers.from(new Executor() {
@Override
public void execute(Runnable r) {
new Thread(r).start();
}
});
Disposable d = s.scheduleDirect(new Runnable() {
@Override
public void run() {
throw new IllegalStateException();
}
});

while (!d.isDisposed()) {
Thread.sleep(1);
}
}

@Test(timeout = 1000)
public void runnableDisposedAsyncTimed() throws Exception {
final Scheduler s = Schedulers.from(new Executor() {
@Override
public void execute(Runnable r) {
new Thread(r).start();
}
});
Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MILLISECONDS);

while (!d.isDisposed()) {
Thread.sleep(1);
}
}

@Test(timeout = 1000)
public void runnableDisposedAsyncTimed2() throws Exception {
ExecutorService executorScheduler = Executors.newScheduledThreadPool(1, new RxThreadFactory("TestCustomPoolTimed"));
try {
final Scheduler s = Schedulers.from(executorScheduler);
Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MILLISECONDS);

while (!d.isDisposed()) {
Thread.sleep(1);
}
} finally {
executorScheduler.shutdownNow();
}
}
}

0 comments on commit dd0c96d

Please sign in to comment.