diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index e1e7c1e806..707efd09df 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -101,6 +101,51 @@ public interface Scheduler { */ Subscription schedule(Func0 action, long dueTime, TimeUnit unit); + /** + * Schedules an action to be executed periodically. + * + * @param action The action to execute periodically. + * @param initialDelay Time to wait before executing the action for the first time. + * @param period The time interval to wait each time in between executing the action. + * @param unit The time unit the interval above is given in. + * @return A subscription to be able to unsubscribe from action. + */ + Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit); + + /** + * Schedules a cancelable action to be executed periodically. + * + * @param action The action to execute periodically. + * @param initialDelay Time to wait before executing the action for the first time. + * @param period The time interval to wait each time in between executing the action. + * @param unit The time unit the interval above is given in. + * @return A subscription to be able to unsubscribe from action. + */ + Subscription schedulePeriodically(Func0 action, long initialDelay, long period, TimeUnit unit); + + /** + * Schedules a cancelable action to be executed periodically. + * + * @param action The action to execute periodically. + * @param initialDelay Time to wait before executing the action for the first time. + * @param period The time interval to wait each time in between executing the action. + * @param unit The time unit the interval above is given in. + * @return A subscription to be able to unsubscribe from action. + */ + Subscription schedulePeriodically(Func1 action, long initialDelay, long period, TimeUnit unit); + + /** + * Schedules a cancelable action to be executed periodically. + * + * @param state State to pass into the action. + * @param action The action to execute periodically. + * @param initialDelay Time to wait before executing the action for the first time. + * @param period The time interval to wait each time in between executing the action. + * @param unit The time unit the interval above is given in. + * @return A subscription to be able to unsubscribe from action. + */ + Subscription schedulePeriodically(T state, Func2 action, long initialDelay, long period, TimeUnit unit); + /** * Returns the scheduler's notion of current time. */ diff --git a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java index d15e8e184a..fc46e41132 100644 --- a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java @@ -16,6 +16,7 @@ package rx.concurrency; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import rx.Scheduler; import rx.Subscription; @@ -34,22 +35,12 @@ public Subscription schedule(Action0 action) { @Override public Subscription schedule(final Func1 action) { - return schedule(new Func0() { - @Override - public Subscription call() { - return action.call(AbstractScheduler.this); - } - }); + return schedule(func0ForwardingToFunc1(action)); } @Override public Subscription schedule(final T state, final Func2 action) { - return schedule(new Func0() { - @Override - public Subscription call() { - return action.call(AbstractScheduler.this, state); - } - }); + return schedule(func0ForwardingToFunc2(action, state)); } @Override @@ -59,29 +50,92 @@ public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) { @Override public Subscription schedule(final Func1 action, long dueTime, TimeUnit unit) { - return schedule(new Func0() { - @Override - public Subscription call() { - return action.call(AbstractScheduler.this); - } - }, dueTime, unit); + return schedule(func0ForwardingToFunc1(action), dueTime, unit); } @Override public Subscription schedule(final T state, final Func2 action, long dueTime, TimeUnit unit) { - return schedule(new Func0() { + return schedule(func0ForwardingToFunc2(action, state), dueTime, unit); + } + + @Override + public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit) { + return schedulePeriodically(asFunc0(action), initialDelay, period, unit); + } + + /** + * This default implementation schedules recursively and waits for actions to complete (instead of potentially executing + * long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this. + */ + @Override + public Subscription schedulePeriodically(final Func0 action, long initialDelay, final long period, final TimeUnit unit) { + final long periodInNanos = unit.toNanos(period); + final AtomicBoolean complete = new AtomicBoolean(); + + final Func0 recursiveAction = new Func0() { @Override public Subscription call() { - return action.call(AbstractScheduler.this, state); + if (! complete.get()) { + long startedAt = System.nanoTime(); + final Subscription sub1 = action.call(); + long timeTakenByActionInNanos = System.nanoTime() - startedAt; + final Subscription sub2 = schedule(this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS); + return Subscriptions.create(new Action0() { + @Override + public void call() { + sub1.unsubscribe(); + sub2.unsubscribe(); + } + }); + } + return Subscriptions.empty(); + } + }; + final Subscription sub = schedule(recursiveAction, initialDelay, unit); + return Subscriptions.create(new Action0() { + @Override + public void call() { + complete.set(true); + sub.unsubscribe(); } - }, dueTime, unit); + }); } - + + @Override + public Subscription schedulePeriodically(Func1 action, long initialDelay, long period, TimeUnit unit) { + return schedulePeriodically(func0ForwardingToFunc1(action), initialDelay, period, unit); + } + + @Override + public Subscription schedulePeriodically(T state, Func2 action, long initialDelay, long period, TimeUnit unit) { + return schedulePeriodically(func0ForwardingToFunc2(action, state), initialDelay, period, unit); + } + @Override public long now() { return System.nanoTime(); } + @SuppressWarnings("static-method") // can't be done, of course, but Eclipse fails at detecting AbstractScheduler.this + private Func0 func0ForwardingToFunc1(final Func1 func1) { + return new Func0() { + @Override + public Subscription call() { + return func1.call(AbstractScheduler.this); + } + }; + } + + @SuppressWarnings("static-method") // can't be done, of course, but Eclipse fails at detecting AbstractScheduler.this + private Func0 func0ForwardingToFunc2(final Func2 func2, final T state) { + return new Func0() { + @Override + public Subscription call() { + return func2.call(AbstractScheduler.this, state); + } + }; + } + private static Func0 asFunc0(final Action0 action) { return new Func0() { @Override diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java index 133f772889..2e0f3ae78a 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java @@ -15,15 +15,20 @@ */ package rx.concurrency; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import rx.Scheduler; import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; import rx.util.functions.Func0; /** @@ -123,4 +128,30 @@ public void run() { } + @Override + public Subscription schedulePeriodically(final Func0 action, long initialDelay, long period, TimeUnit unit) { + final Queue subscriptions = new ConcurrentLinkedQueue(); + if (executor instanceof ScheduledExecutorService) { + final ScheduledFuture future = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + subscriptions.add(action.call()); + } + }, initialDelay, period, unit); + + return Subscriptions.create(new Action0() { + @Override + public void call() { + future.cancel(false); + Subscription next = subscriptions.poll(); + while (next != null) { + next.unsubscribe(); + next = subscriptions.poll(); + } + } + }); + } + // not a scheduled executor service, so we fall back to the recursive implementation + return super.schedulePeriodically(action, initialDelay, period, unit); + } } diff --git a/rxjava-core/src/main/java/rx/operators/Tester.java b/rxjava-core/src/main/java/rx/operators/Tester.java index 11b2c8a798..e209dfc43b 100644 --- a/rxjava-core/src/main/java/rx/operators/Tester.java +++ b/rxjava-core/src/main/java/rx/operators/Tester.java @@ -320,6 +320,26 @@ public Subscription schedule(T state, Func2 acti return underlying.schedule(state, action, dueTime, unit); } + @Override + public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit) { + return underlying.schedulePeriodically(action, initialDelay, period, unit); + } + + @Override + public Subscription schedulePeriodically(Func0 action, long initialDelay, long period, TimeUnit unit) { + return underlying.schedulePeriodically(action, initialDelay, period, unit); + } + + @Override + public Subscription schedulePeriodically(Func1 action, long initialDelay, long period, TimeUnit unit) { + return underlying.schedulePeriodically(action, initialDelay, period, unit); + } + + @Override + public Subscription schedulePeriodically(T state, Func2 action, long initialDelay, long period, TimeUnit unit) { + return underlying.schedulePeriodically(state, action, initialDelay, period, unit); + } + @Override public long now() { return underlying.now();