Skip to content

Commit

Permalink
initial attempt at implementing periodic schedulers as discussed in R…
Browse files Browse the repository at this point in the history
…eactiveX#228, needs some testing next...
  • Loading branch information
jmhofer committed Apr 12, 2013
1 parent 7834e8a commit 78f7dd8
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 22 deletions.
45 changes: 45 additions & 0 deletions rxjava-core/src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,51 @@ public interface Scheduler {
*/
Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit);

/**
* Schedules an action to be executed periodically.
*
* @param action The action to execute periodically.
* @param initialDelay Time to wait before executing the action for the first time.
* @param period The time interval to wait each time in between executing the action.
* @param unit The time unit the interval above is given in.
* @return A subscription to be able to unsubscribe from action.
*/
Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit);

/**
* Schedules a cancelable action to be executed periodically.
*
* @param action The action to execute periodically.
* @param initialDelay Time to wait before executing the action for the first time.
* @param period The time interval to wait each time in between executing the action.
* @param unit The time unit the interval above is given in.
* @return A subscription to be able to unsubscribe from action.
*/
Subscription schedulePeriodically(Func0<Subscription> action, long initialDelay, long period, TimeUnit unit);

/**
* Schedules a cancelable action to be executed periodically.
*
* @param action The action to execute periodically.
* @param initialDelay Time to wait before executing the action for the first time.
* @param period The time interval to wait each time in between executing the action.
* @param unit The time unit the interval above is given in.
* @return A subscription to be able to unsubscribe from action.
*/
Subscription schedulePeriodically(Func1<Scheduler, Subscription> action, long initialDelay, long period, TimeUnit unit);

/**
* Schedules a cancelable action to be executed periodically.
*
* @param state State to pass into the action.
* @param action The action to execute periodically.
* @param initialDelay Time to wait before executing the action for the first time.
* @param period The time interval to wait each time in between executing the action.
* @param unit The time unit the interval above is given in.
* @return A subscription to be able to unsubscribe from action.
*/
<T> Subscription schedulePeriodically(T state, Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit);

/**
* Returns the scheduler's notion of current time.
*/
Expand Down
98 changes: 76 additions & 22 deletions rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.concurrency;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import rx.Scheduler;
import rx.Subscription;
Expand All @@ -34,22 +35,12 @@ public Subscription schedule(Action0 action) {

@Override
public Subscription schedule(final Func1<Scheduler, Subscription> action) {
return schedule(new Func0<Subscription>() {
@Override
public Subscription call() {
return action.call(AbstractScheduler.this);
}
});
return schedule(func0ForwardingToFunc1(action));
}

@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
return schedule(new Func0<Subscription>() {
@Override
public Subscription call() {
return action.call(AbstractScheduler.this, state);
}
});
return schedule(func0ForwardingToFunc2(action, state));
}

@Override
Expand All @@ -59,29 +50,92 @@ public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) {

@Override
public Subscription schedule(final Func1<Scheduler, Subscription> action, long dueTime, TimeUnit unit) {
return schedule(new Func0<Subscription>() {
@Override
public Subscription call() {
return action.call(AbstractScheduler.this);
}
}, dueTime, unit);
return schedule(func0ForwardingToFunc1(action), dueTime, unit);
}

@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
return schedule(new Func0<Subscription>() {
return schedule(func0ForwardingToFunc2(action, state), dueTime, unit);
}

@Override
public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit) {
return schedulePeriodically(asFunc0(action), initialDelay, period, unit);
}

/**
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
*/
@Override
public Subscription schedulePeriodically(final Func0<Subscription> action, long initialDelay, final long period, final TimeUnit unit) {
final long periodInNanos = unit.toNanos(period);
final AtomicBoolean complete = new AtomicBoolean();

final Func0<Subscription> recursiveAction = new Func0<Subscription>() {
@Override
public Subscription call() {
return action.call(AbstractScheduler.this, state);
if (! complete.get()) {
long startedAt = System.nanoTime();
final Subscription sub1 = action.call();
long timeTakenByActionInNanos = System.nanoTime() - startedAt;
final Subscription sub2 = schedule(this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS);
return Subscriptions.create(new Action0() {
@Override
public void call() {
sub1.unsubscribe();
sub2.unsubscribe();
}
});
}
return Subscriptions.empty();
}
};
final Subscription sub = schedule(recursiveAction, initialDelay, unit);
return Subscriptions.create(new Action0() {
@Override
public void call() {
complete.set(true);
sub.unsubscribe();
}
}, dueTime, unit);
});
}


@Override
public Subscription schedulePeriodically(Func1<Scheduler, Subscription> action, long initialDelay, long period, TimeUnit unit) {
return schedulePeriodically(func0ForwardingToFunc1(action), initialDelay, period, unit);
}

@Override
public <T> Subscription schedulePeriodically(T state, Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
return schedulePeriodically(func0ForwardingToFunc2(action, state), initialDelay, period, unit);
}

@Override
public long now() {
return System.nanoTime();
}

@SuppressWarnings("static-method") // can't be done, of course, but Eclipse fails at detecting AbstractScheduler.this
private Func0<Subscription> func0ForwardingToFunc1(final Func1<Scheduler, Subscription> func1) {
return new Func0<Subscription>() {
@Override
public Subscription call() {
return func1.call(AbstractScheduler.this);
}
};
}

@SuppressWarnings("static-method") // can't be done, of course, but Eclipse fails at detecting AbstractScheduler.this
private <T> Func0<Subscription> func0ForwardingToFunc2(final Func2<Scheduler, T, Subscription> func2, final T state) {
return new Func0<Subscription>() {
@Override
public Subscription call() {
return func2.call(AbstractScheduler.this, state);
}
};
}

private static Func0<Subscription> asFunc0(final Action0 action) {
return new Func0<Subscription>() {
@Override
Expand Down
31 changes: 31 additions & 0 deletions rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@
*/
package rx.concurrency;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func0;

/**
Expand Down Expand Up @@ -123,4 +128,30 @@ public void run() {

}

@Override
public Subscription schedulePeriodically(final Func0<Subscription> action, long initialDelay, long period, TimeUnit unit) {
final Queue<Subscription> subscriptions = new ConcurrentLinkedQueue<Subscription>();
if (executor instanceof ScheduledExecutorService) {
final ScheduledFuture<?> future = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
subscriptions.add(action.call());
}
}, initialDelay, period, unit);

return Subscriptions.create(new Action0() {
@Override
public void call() {
future.cancel(false);
Subscription next = subscriptions.poll();
while (next != null) {
next.unsubscribe();
next = subscriptions.poll();
}
}
});
}
// not a scheduled executor service, so we fall back to the recursive implementation
return super.schedulePeriodically(action, initialDelay, period, unit);
}
}
20 changes: 20 additions & 0 deletions rxjava-core/src/main/java/rx/operators/Tester.java
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,26 @@ public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> acti
return underlying.schedule(state, action, dueTime, unit);
}

@Override
public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit) {
return underlying.schedulePeriodically(action, initialDelay, period, unit);
}

@Override
public Subscription schedulePeriodically(Func0<Subscription> action, long initialDelay, long period, TimeUnit unit) {
return underlying.schedulePeriodically(action, initialDelay, period, unit);
}

@Override
public Subscription schedulePeriodically(Func1<Scheduler, Subscription> action, long initialDelay, long period, TimeUnit unit) {
return underlying.schedulePeriodically(action, initialDelay, period, unit);
}

@Override
public <T> Subscription schedulePeriodically(T state, Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
return underlying.schedulePeriodically(state, action, initialDelay, period, unit);
}

@Override
public long now() {
return underlying.now();
Expand Down

0 comments on commit 78f7dd8

Please sign in to comment.