From c4f54087419d3b1e4587a939cec638f6743220c1 Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Mon, 29 Feb 2016 16:07:51 -0800 Subject: [PATCH 1/3] fix(FutureAction): add support for periodic scheduling with setInterval instead of setTimeout --- src/scheduler/FutureAction.ts | 83 ++++++++++++++++++++++++++++++++--- 1 file changed, 76 insertions(+), 7 deletions(-) diff --git a/src/scheduler/FutureAction.ts b/src/scheduler/FutureAction.ts index 819b891854..fa778e0b01 100644 --- a/src/scheduler/FutureAction.ts +++ b/src/scheduler/FutureAction.ts @@ -8,6 +8,7 @@ export class FutureAction extends Subscription implements Action { public id: number; public state: T; public delay: number; + private pending: boolean = false; constructor(public scheduler: Scheduler, public work: (x?: T) => Subscription | void) { @@ -17,8 +18,14 @@ export class FutureAction extends Subscription implements Action { execute() { if (this.isUnsubscribed) { throw new Error('How did did we execute a canceled Action?'); + } else { + try { + this.work(this.state); + } catch (e) { + this.unsubscribe(); + throw e; + } } - this.work(this.state); } schedule(state?: T, delay: number = 0): Action { @@ -30,20 +37,81 @@ export class FutureAction extends Subscription implements Action { protected _schedule(state?: T, delay: number = 0): Action { - this.delay = delay; + // Always replace the current state with the new state. this.state = state; + + // Set the pending flag indicating that this action has been scheduled, or + // has recursively rescheduled itself. + this.pending = true; + const id = this.id; + // If this action has an intervalID and the specified delay matches the + // delay we used to create the intervalID, don't call `setInterval` again. + if (id != null && this.delay === delay) { + return this; + } + this.delay = delay; + + // If this action has an intervalID, but was rescheduled with a different + // `delay` time, cancel the current intervalID and call `setInterval` with + // the new `delay` time. if (id != null) { this.id = null; - root.clearTimeout(id); + root.clearInterval(id); } - this.id = root.setTimeout(() => { - this.id = null; - const {scheduler} = this; + // + // Important implementation note: + // + // By default, FutureAction only executes once. However, Actions have the + // ability to be rescheduled from within the scheduled callback (mimicking + // recursion for asynchronous methods). This allows us to implement single + // and repeated actions with the same code path without adding API surface + // area, and implement tail-call optimization over asynchronous boundaries. + // + // However, JS runtimes make a distinction between intervals scheduled by + // repeatedly calling `setTimeout` vs. a single `setInterval` call, with + // the latter providing a better guarantee of precision. + // + // In order to accommodate both single and repeatedly rescheduled actions, + // use `setInterval` here for both cases. By default, the interval will be + // canceled after its first execution, or if the action schedules itself to + // run again with a different `delay` time. + // + // If the action recursively schedules itself to run again with the same + // `delay` time, the interval is not canceled, but allowed to loop again. + // The check of whether the interval should be canceled or not is run every + // time the interval is executed. The first time an action fails to + // reschedule itself, the interval is canceled. + // + this.id = root.setInterval(() => { + + this.pending = false; + const {id, scheduler} = this; scheduler.actions.push(this); scheduler.flush(); + + // + // Terminate this interval if the action didn't reschedule itself. + // Don't call `this.unsubscribe()` here, because the action could be + // rescheduled later. For example: + // + // ``` + // scheduler.schedule(function doWork(counter) { + // /* ... I'm a busy worker bee ... */ + // var originalAction = this; + // /* wait 100ms before rescheduling this action again */ + // setTimeout(function () { + // originalAction.schedule(counter + 1); + // }, 100); + // }, 1000); + // ``` + + if (this.pending === false && id != null) { + this.id = null; + root.clearInterval(id); + } }, delay); return this; @@ -51,13 +119,14 @@ export class FutureAction extends Subscription implements Action { protected _unsubscribe() { + this.pending = false; const {id, scheduler} = this; const {actions} = scheduler; const index = actions.indexOf(this); if (id != null) { this.id = null; - root.clearTimeout(id); + root.clearInterval(id); } if (index !== -1) { From 4486c1fa19f638379f339e77e97f271ef8aba15f Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Mon, 29 Feb 2016 16:09:51 -0800 Subject: [PATCH 2/3] feat(AsyncScheduler): add AsyncScheduler implementation --- src/Rx.DOM.ts | 3 +++ src/Rx.KitchenSink.ts | 3 +++ src/Rx.ts | 3 +++ src/scheduler/AsyncScheduler.ts | 10 ++++++++++ src/scheduler/async.ts | 3 +++ 5 files changed, 22 insertions(+) create mode 100644 src/scheduler/AsyncScheduler.ts create mode 100644 src/scheduler/async.ts diff --git a/src/Rx.DOM.ts b/src/Rx.DOM.ts index b56fb70330..fef7d595c6 100644 --- a/src/Rx.DOM.ts +++ b/src/Rx.DOM.ts @@ -124,9 +124,11 @@ import {EmptyError} from './util/EmptyError'; import {ArgumentOutOfRangeError} from './util/ArgumentOutOfRangeError'; import {ObjectUnsubscribedError} from './util/ObjectUnsubscribedError'; import {asap} from './scheduler/asap'; +import {async} from './scheduler/async'; import {queue} from './scheduler/queue'; import {animationFrame} from './scheduler/animationFrame'; import {AsapScheduler} from './scheduler/AsapScheduler'; +import {AsyncScheduler} from './scheduler/AsyncScheduler'; import {QueueScheduler} from './scheduler/QueueScheduler'; import {AnimationFrameScheduler} from './scheduler/AnimationFrameScheduler'; import {rxSubscriber} from './symbol/rxSubscriber'; @@ -136,6 +138,7 @@ import {AjaxRequest, AjaxResponse, AjaxError, AjaxTimeoutError} from './observab /* tslint:disable:no-var-keyword */ var Scheduler = { asap, + async, queue, animationFrame }; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index ac418b397f..75c2eb09a9 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -172,8 +172,10 @@ import {EmptyError} from './util/EmptyError'; import {ObjectUnsubscribedError} from './util/ObjectUnsubscribedError'; import {ArgumentOutOfRangeError} from './util/ArgumentOutOfRangeError'; import {asap} from './scheduler/asap'; +import {async} from './scheduler/async'; import {queue} from './scheduler/queue'; import {AsapScheduler} from './scheduler/AsapScheduler'; +import {AsyncScheduler} from './scheduler/AsyncScheduler'; import {QueueScheduler} from './scheduler/QueueScheduler'; import {TimeInterval} from './operator/timeInterval'; import {TestScheduler} from './testing/TestScheduler'; @@ -184,6 +186,7 @@ import {rxSubscriber} from './symbol/rxSubscriber'; /* tslint:disable:no-var-keyword */ var Scheduler = { asap, + async, queue }; diff --git a/src/Rx.ts b/src/Rx.ts index 15ec5b5488..9b5edf2a9c 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -129,8 +129,10 @@ import {EmptyError} from './util/EmptyError'; import {ArgumentOutOfRangeError} from './util/ArgumentOutOfRangeError'; import {ObjectUnsubscribedError} from './util/ObjectUnsubscribedError'; import {asap} from './scheduler/asap'; +import {async} from './scheduler/async'; import {queue} from './scheduler/queue'; import {AsapScheduler} from './scheduler/AsapScheduler'; +import {AsyncScheduler} from './scheduler/AsyncScheduler'; import {QueueScheduler} from './scheduler/QueueScheduler'; import {rxSubscriber} from './symbol/rxSubscriber'; /* tslint:enable:no-unused-variable */ @@ -138,6 +140,7 @@ import {rxSubscriber} from './symbol/rxSubscriber'; /* tslint:disable:no-var-keyword */ var Scheduler = { asap, + async, queue }; diff --git a/src/scheduler/AsyncScheduler.ts b/src/scheduler/AsyncScheduler.ts new file mode 100644 index 0000000000..a92a2a01c9 --- /dev/null +++ b/src/scheduler/AsyncScheduler.ts @@ -0,0 +1,10 @@ +import {Action} from './Action'; +import {FutureAction} from './FutureAction'; +import {Subscription} from '../Subscription'; +import {QueueScheduler} from './QueueScheduler'; + +export class AsyncScheduler extends QueueScheduler { + scheduleNow(work: (x?: any) => Subscription, state?: any): Action { + return new FutureAction(this, work).schedule(state, 0); + } +} diff --git a/src/scheduler/async.ts b/src/scheduler/async.ts new file mode 100644 index 0000000000..fce4d75f68 --- /dev/null +++ b/src/scheduler/async.ts @@ -0,0 +1,3 @@ +import {AsyncScheduler} from './AsyncScheduler'; + +export const async = new AsyncScheduler(); From 05719b2c257a6548ac127ebc9d51c5e21e829786 Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Mon, 29 Feb 2016 16:12:38 -0800 Subject: [PATCH 3/3] refactor(operators): substitute asap for async scheduler as default in operators --- spec/observables/interval-spec.ts | 2 +- src/observable/IntervalObservable.ts | 8 ++++---- src/observable/TimerObservable.ts | 4 ++-- src/operator/bufferTime.ts | 6 +++--- src/operator/debounceTime.ts | 4 ++-- src/operator/delay.ts | 4 ++-- src/operator/inspectTime.ts | 4 ++-- src/operator/sampleTime.ts | 4 ++-- src/operator/throttleTime.ts | 4 ++-- src/operator/timeInterval.ts | 4 ++-- src/operator/timeout.ts | 4 ++-- src/operator/timeoutWith.ts | 4 ++-- src/operator/windowTime.ts | 4 ++-- 13 files changed, 28 insertions(+), 28 deletions(-) diff --git a/spec/observables/interval-spec.ts b/spec/observables/interval-spec.ts index eef4d1c29a..d9c2503112 100644 --- a/spec/observables/interval-spec.ts +++ b/spec/observables/interval-spec.ts @@ -16,7 +16,7 @@ describe('Observable.interval', () => { it('should specify default scheduler if incorrect scheduler specified', () => { const scheduler = (Observable.interval(10, jasmine.createSpy('dummy'))).scheduler; - expect(scheduler).toBe(Rx.Scheduler.asap); + expect(scheduler).toBe(Rx.Scheduler.async); }); it('should emit when relative interval set to zero', () => { diff --git a/src/observable/IntervalObservable.ts b/src/observable/IntervalObservable.ts index 1238418b58..cf3cdda5d6 100644 --- a/src/observable/IntervalObservable.ts +++ b/src/observable/IntervalObservable.ts @@ -2,7 +2,7 @@ import {Subscriber} from '../Subscriber'; import {isNumeric} from '../util/isNumeric'; import {Scheduler} from '../Scheduler'; import {Observable} from '../Observable'; -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; /** * @@ -16,7 +16,7 @@ export class IntervalObservable extends Observable { * @name interval * @owner Observable */ - static create(period: number = 0, scheduler: Scheduler = asap): Observable { + static create(period: number = 0, scheduler: Scheduler = async): Observable { return new IntervalObservable(period, scheduler); } @@ -34,13 +34,13 @@ export class IntervalObservable extends Observable { ( this).schedule(state, period); } - constructor(private period: number = 0, private scheduler: Scheduler = asap) { + constructor(private period: number = 0, private scheduler: Scheduler = async) { super(); if (!isNumeric(period) || period < 0) { this.period = 0; } if (!scheduler || typeof scheduler.schedule !== 'function') { - this.scheduler = asap; + this.scheduler = async; } } diff --git a/src/observable/TimerObservable.ts b/src/observable/TimerObservable.ts index 438b4acadd..3db92b49c5 100644 --- a/src/observable/TimerObservable.ts +++ b/src/observable/TimerObservable.ts @@ -1,7 +1,7 @@ import {isNumeric} from '../util/isNumeric'; import {Scheduler} from '../Scheduler'; import {Observable} from '../Observable'; -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; import {isScheduler} from '../util/isScheduler'; import {isDate} from '../util/isDate'; import {Subscription} from '../Subscription'; @@ -58,7 +58,7 @@ export class TimerObservable extends Observable { } if (!isScheduler(scheduler)) { - scheduler = asap; + scheduler = async; } this.scheduler = scheduler; diff --git a/src/operator/bufferTime.ts b/src/operator/bufferTime.ts index 874761eb51..085fd616bc 100644 --- a/src/operator/bufferTime.ts +++ b/src/operator/bufferTime.ts @@ -3,7 +3,7 @@ import {Subscriber} from '../Subscriber'; import {Observable} from '../Observable'; import {Scheduler} from '../Scheduler'; import {Action} from '../scheduler/Action'; -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; /** * Buffers values from the source for a specific time period. Optionally allows @@ -15,7 +15,7 @@ import {asap} from '../scheduler/asap'; * before emitting them and clearing them. * @param {number} [bufferCreationInterval] the interval at which to start new * buffers. - * @param {Scheduler} [scheduler] (optional, defaults to `asap` scheduler) The + * @param {Scheduler} [scheduler] (optional, defaults to `async` scheduler) The * scheduler on which to schedule the intervals that determine buffer * boundaries. * @return {Observable} an observable of arrays of buffered values. @@ -24,7 +24,7 @@ import {asap} from '../scheduler/asap'; */ export function bufferTime(bufferTimeSpan: number, bufferCreationInterval: number = null, - scheduler: Scheduler = asap): Observable { + scheduler: Scheduler = async): Observable { return this.lift(new BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, scheduler)); } diff --git a/src/operator/debounceTime.ts b/src/operator/debounceTime.ts index 5a9288d779..aebf4ff0f9 100644 --- a/src/operator/debounceTime.ts +++ b/src/operator/debounceTime.ts @@ -3,7 +3,7 @@ import {Observable} from '../Observable'; import {Subscriber} from '../Subscriber'; import {Scheduler} from '../Scheduler'; import {Subscription} from '../Subscription'; -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; /** * Returns the source Observable delayed by the computed debounce duration, @@ -19,7 +19,7 @@ import {asap} from '../scheduler/asap'; * @method debounceTime * @owner Observable */ -export function debounceTime(dueTime: number, scheduler: Scheduler = asap): Observable { +export function debounceTime(dueTime: number, scheduler: Scheduler = async): Observable { return this.lift(new DebounceTimeOperator(dueTime, scheduler)); } diff --git a/src/operator/delay.ts b/src/operator/delay.ts index 2b77150632..790d4b2c41 100644 --- a/src/operator/delay.ts +++ b/src/operator/delay.ts @@ -1,4 +1,4 @@ -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; import {isDate} from '../util/isDate'; import {Operator} from '../Operator'; import {Scheduler} from '../Scheduler'; @@ -16,7 +16,7 @@ import {Observable} from '../Observable'; * @owner Observable */ export function delay(delay: number|Date, - scheduler: Scheduler = asap): Observable { + scheduler: Scheduler = async): Observable { const absoluteDelay = isDate(delay); const delayFor = absoluteDelay ? (+delay - scheduler.now()) : Math.abs(delay); return this.lift(new DelayOperator(delayFor, scheduler)); diff --git a/src/operator/inspectTime.ts b/src/operator/inspectTime.ts index 94b69a7c38..b1c0cd7ab5 100644 --- a/src/operator/inspectTime.ts +++ b/src/operator/inspectTime.ts @@ -1,4 +1,4 @@ -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; import {Operator} from '../Operator'; import {Scheduler} from '../Scheduler'; import {Subscriber} from '../Subscriber'; @@ -12,7 +12,7 @@ import {Subscription} from '../Subscription'; * @method inspectTime * @owner Observable */ -export function inspectTime(delay: number, scheduler: Scheduler = asap): Observable { +export function inspectTime(delay: number, scheduler: Scheduler = async): Observable { return this.lift(new InspectTimeOperator(delay, scheduler)); } diff --git a/src/operator/sampleTime.ts b/src/operator/sampleTime.ts index e1ccd9551d..4b6869b96f 100644 --- a/src/operator/sampleTime.ts +++ b/src/operator/sampleTime.ts @@ -2,7 +2,7 @@ import {Observable} from '../Observable'; import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; import {Scheduler} from '../Scheduler'; -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; /** * @param delay @@ -11,7 +11,7 @@ import {asap} from '../scheduler/asap'; * @method sampleTime * @owner Observable */ -export function sampleTime(delay: number, scheduler: Scheduler = asap): Observable { +export function sampleTime(delay: number, scheduler: Scheduler = async): Observable { return this.lift(new SampleTimeOperator(delay, scheduler)); } diff --git a/src/operator/throttleTime.ts b/src/operator/throttleTime.ts index 30584325a4..2993a72088 100644 --- a/src/operator/throttleTime.ts +++ b/src/operator/throttleTime.ts @@ -2,7 +2,7 @@ import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; import {Scheduler} from '../Scheduler'; import {Subscription} from '../Subscription'; -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; import {Observable} from '../Observable'; /** @@ -12,7 +12,7 @@ import {Observable} from '../Observable'; * @method throttleTime * @owner Observable */ -export function throttleTime(delay: number, scheduler: Scheduler = asap): Observable { +export function throttleTime(delay: number, scheduler: Scheduler = async): Observable { return this.lift(new ThrottleTimeOperator(delay, scheduler)); } diff --git a/src/operator/timeInterval.ts b/src/operator/timeInterval.ts index 267c9e3878..60303f28c6 100644 --- a/src/operator/timeInterval.ts +++ b/src/operator/timeInterval.ts @@ -2,7 +2,7 @@ import {Operator} from '../Operator'; import {Observable} from '../Observable'; import {Subscriber} from '../Subscriber'; import {Scheduler} from '../Scheduler'; -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; /** * @param scheduler @@ -10,7 +10,7 @@ import {asap} from '../scheduler/asap'; * @method timeInterval * @owner Observable */ -export function timeInterval(scheduler: Scheduler = asap): Observable> { +export function timeInterval(scheduler: Scheduler = async): Observable> { return this.lift(new TimeIntervalOperator(scheduler)); } diff --git a/src/operator/timeout.ts b/src/operator/timeout.ts index e5beb33fdd..b5a16972d4 100644 --- a/src/operator/timeout.ts +++ b/src/operator/timeout.ts @@ -1,4 +1,4 @@ -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; import {isDate} from '../util/isDate'; import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; @@ -15,7 +15,7 @@ import {Observable} from '../Observable'; */ export function timeout(due: number | Date, errorToSend: any = null, - scheduler: Scheduler = asap): Observable { + scheduler: Scheduler = async): Observable { let absoluteTimeout = isDate(due); let waitFor = absoluteTimeout ? (+due - scheduler.now()) : Math.abs(due); return this.lift(new TimeoutOperator(waitFor, absoluteTimeout, errorToSend, scheduler)); diff --git a/src/operator/timeoutWith.ts b/src/operator/timeoutWith.ts index 606e0a124b..9a97b7621c 100644 --- a/src/operator/timeoutWith.ts +++ b/src/operator/timeoutWith.ts @@ -1,7 +1,7 @@ import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; import {Scheduler} from '../Scheduler'; -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; import {Subscription} from '../Subscription'; import {Observable} from '../Observable'; import {isDate} from '../util/isDate'; @@ -18,7 +18,7 @@ import {subscribeToResult} from '../util/subscribeToResult'; */ export function timeoutWith(due: number | Date, withObservable: Observable, - scheduler: Scheduler = asap): Observable { + scheduler: Scheduler = async): Observable { let absoluteTimeout = isDate(due); let waitFor = absoluteTimeout ? (+due - scheduler.now()) : Math.abs(due); return this.lift(new TimeoutWithOperator(waitFor, absoluteTimeout, withObservable, scheduler)); diff --git a/src/operator/windowTime.ts b/src/operator/windowTime.ts index 6a5a884d44..9a066ccf5f 100644 --- a/src/operator/windowTime.ts +++ b/src/operator/windowTime.ts @@ -4,7 +4,7 @@ import {Observable} from '../Observable'; import {Subject} from '../Subject'; import {Scheduler} from '../Scheduler'; import {Action} from '../scheduler/Action'; -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; /** * @param windowTimeSpan @@ -16,7 +16,7 @@ import {asap} from '../scheduler/asap'; */ export function windowTime(windowTimeSpan: number, windowCreationInterval: number = null, - scheduler: Scheduler = asap): Observable> { + scheduler: Scheduler = async): Observable> { return this.lift(new WindowTimeOperator(windowTimeSpan, windowCreationInterval, scheduler)); }