Skip to content

Commit

Permalink
refactor(Action): move delay setting to Action to support Virtual Tim…
Browse files Browse the repository at this point in the history
…e Scheduling
  • Loading branch information
benlesh committed Sep 6, 2015
1 parent e892568 commit af1cb68
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 50 deletions.
17 changes: 15 additions & 2 deletions spec/schedulers/VirtualTimeScheduler-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ describe('VirtualTimeScheduler', function() {
expect(invoked).toEqual([1, 2, 3, 4, 5]);
});



it('should schedule things in order when flushed if each this is scheduled at random', function () {
var v = new VirtualTimeScheduler();
var invoked = [];
Expand All @@ -43,4 +41,19 @@ describe('VirtualTimeScheduler', function() {

expect(invoked).toEqual([1, 3, 5, 2, 6, 4]);
});

it('should support recursive scheduling', function () {
var v = new VirtualTimeScheduler();
var count = 0;
var expected = [100, 200, 300];

v.schedule(function (state) {
if (++count === 3) return;
expect(this.delay).toBe(expected.shift());
this.schedule(state, this.delay)
}, 100, 'test');

v.flush();
expect(count).toBe(3);
});
});
8 changes: 4 additions & 4 deletions src/observables/IntervalObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ export default class IntervalObservable<T> extends Observable<T> {

static dispatch(state) {

const { index, subscriber } = state;
const { index, subscriber, period } = state;

subscriber.next(index);

if (subscriber.isUnsubscribed) {
return;
}

state.index = index + 1;
state.index += 1;

(<any> this).schedule(state);
(<any> this).schedule(state, period);
}

constructor(private period: number = 0, private scheduler: Scheduler = nextTick) {
Expand All @@ -41,7 +41,7 @@ export default class IntervalObservable<T> extends Observable<T> {
const scheduler = this.scheduler;

subscriber.add(scheduler.schedule(IntervalObservable.dispatch, period, {
index, subscriber
index, subscriber, period
}));
}
}
2 changes: 1 addition & 1 deletion src/observables/TimerObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export default class TimerObservable<T> extends Observable<T> {
}));
} else {
state.index = index + 1;
action.schedule(state);
action.schedule(state, period);
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/operators/bufferTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class BufferTimeSubscriber<T> extends Subscriber<T> {
this.add(scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber: this, buffer }));
this.add(scheduler.schedule(dispatchBufferCreation, bufferCreationInterval, { bufferTimeSpan, bufferCreationInterval, subscriber: this, scheduler }));
} else {
this.add(scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, { subscriber: this, buffer }));
this.add(scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, { subscriber: this, buffer, bufferTimeSpan }));
}
}

Expand Down Expand Up @@ -82,15 +82,15 @@ function dispatchBufferTimeSpanOnly(state) {
}

state.buffer = subscriber.openBuffer();
(<any>this).schedule(state);
(<any>this).schedule(state, state.bufferTimeSpan);
}

function dispatchBufferCreation(state) {
let { bufferTimeSpan, subscriber, scheduler } = state;
let { bufferCreationInterval, bufferTimeSpan, subscriber, scheduler } = state;
let buffer = subscriber.openBuffer();
var action = <Action>this;
action.add(scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber, buffer }));
action.schedule(state);
action.schedule(state, bufferCreationInterval);
}

function dispatchBufferClose({ subscriber, buffer }) {
Expand Down
4 changes: 2 additions & 2 deletions src/operators/delay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class DelaySubscriber<T> extends Subscriber<T> {
queue.shift().notification.observe(destination);
}
if (queue.length > 0) {
(<any> this).delay = Math.max(0, queue[0].time - scheduler.now());
(<any> this).schedule(state);
let delay = Math.max(0, queue[0].time - scheduler.now());
(<any> this).schedule(state, delay);
} else {
source.active = false;
}
Expand Down
9 changes: 5 additions & 4 deletions src/operators/sampleTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class SampleTimeSubscriber<T> extends Subscriber<T> {

constructor(destination: Observer<T>, private delay: number, private scheduler: Scheduler) {
super(destination);
this.add(scheduler.schedule(dispatchNotification, delay, { subscriber: this }));
this.add(scheduler.schedule(dispatchNotification, delay, { subscriber: this, delay }));
}

_next(value: T) {
Expand All @@ -40,7 +40,8 @@ class SampleTimeSubscriber<T> extends Subscriber<T> {
}
}

function dispatchNotification<T>(state: { subscriber: SampleTimeSubscriber<T> }) {
state.subscriber.notifyNext();
(<any>this).schedule(state);
function dispatchNotification<T>(state) {
let { subscriber, delay } = state;
subscriber.notifyNext();
(<any>this).schedule(state, delay);
}
21 changes: 9 additions & 12 deletions src/operators/windowTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class WindowTimeSubscriber<T> extends Subscriber<T> {
this.add(scheduler.schedule(dispatchWindowCreation, windowCreationInterval, { windowTimeSpan, windowCreationInterval, subscriber: this, scheduler }))
} else {
let window = this.openWindow();
this.add(scheduler.schedule(dispatchWindowTimeSpanOnly, windowTimeSpan, { subscriber: this, window }));
this.add(scheduler.schedule(dispatchWindowTimeSpanOnly, windowTimeSpan, { subscriber: this, window, windowTimeSpan }));
}
}

Expand Down Expand Up @@ -79,25 +79,22 @@ class WindowTimeSubscriber<T> extends Subscriber<T> {
}
}

function dispatchWindowTimeSpanOnly(state) {
const subscriber: WindowTimeSubscriber<any> = state.subscriber;

const prevWindow: Subject<any> = state.window;
if (prevWindow) {
prevWindow.complete();
function dispatchWindowTimeSpanOnly<T>(state: { window: Subject<any>, windowTimeSpan: number, subscriber: WindowTimeSubscriber<T>}) {
const { subscriber, windowTimeSpan, window } = state;
if (window) {
window.complete();
}

let window = subscriber.openWindow();
(<any>this).schedule({ subscriber, window });
state.window = subscriber.openWindow();
(<any>this).schedule(state, windowTimeSpan);
}

function dispatchWindowCreation(state) {
let { windowTimeSpan, subscriber, scheduler } = state;
let { windowTimeSpan, subscriber, scheduler, windowCreationInterval } = state;
let window = subscriber.openWindow();
let action = <Action>this;
let context = { action, subscription: null };
action.add(context.subscription = scheduler.schedule(dispatchWindowClose, windowTimeSpan, { subscriber, window, context }));
action.schedule(state);
action.schedule(state, windowCreationInterval);
}

function dispatchWindowClose({ subscriber, window, context }) {
Expand Down
6 changes: 4 additions & 2 deletions src/schedulers/Action.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import Subscription from '../Subscription';
import Scheduler from '../Scheduler';

interface Action extends Subscription<any> {
work: (state?: any) => void|Subscription<any>
state: any;
state?: any;
delay?: number;
schedule(state: any);
schedule(state?: any, delay?: number);
execute(): void;
scheduler: Scheduler;
}

export default Action;
10 changes: 5 additions & 5 deletions src/schedulers/FutureAction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ import ImmediateAction from './ImmediateAction';
export default class FutureAction<T> extends ImmediateAction<T> {

id: any;

delay: number;

constructor(public scheduler: ImmediateScheduler,
public work: (x?: any) => Subscription<T> | void,
public delay: number) {
public work: (x?: any) => Subscription<T> | void) {
super(scheduler, work);
}

schedule(state?:any): Action {
schedule(state?:any, delay:number = 0): Action {
if (this.isUnsubscribed) {
return this;
}

this.delay = delay;
this.state = state;

const id = this.id;

if (id != null) {
Expand Down
2 changes: 1 addition & 1 deletion src/schedulers/ImmediateScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ export default class ImmediateScheduler implements Scheduler {
}

scheduleLater<T>(work: (x?: any) => Subscription<T> | void, delay: number, state?: any): Action {
return new FutureAction(this, work, delay).schedule(state);
return new FutureAction(this, work).schedule(state, delay);
}
}
29 changes: 16 additions & 13 deletions src/schedulers/VirtualTimeScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export default class VirtualTimeScheduler implements Scheduler {
scheduled: boolean = false;
index: number = 0;
sorted: boolean = false;
frame: number = -1;
frame: number = 0;

now() {
return 0;
Expand All @@ -25,38 +25,41 @@ export default class VirtualTimeScheduler implements Scheduler {

flush() {
this.sortActions();
this.actions.forEach((action, frame) => {
this.frame = frame;
const actions = this.actions;
while (actions.length > 0) {
let action = actions.shift();
this.frame = action.delay;
action.execute();
});
this.actions.length = 0;
this.frame = -1;
}
this.frame = 0;
}

schedule<T>(work: (x?: any) => Subscription<T> | void, delay: number = 0, state?: any): Subscription<T> {
this.sorted = false;
return new VirtualAction(this, work, delay, this.index++).schedule(state);
return new VirtualAction(this, work, this.index++).schedule(state, delay);
}
}

class VirtualAction<T> extends Subscription<T> implements Action {
state: any;

delay: number;

constructor(public scheduler: VirtualTimeScheduler,
public work: (x?: any) => Subscription<T> | void,
public delay: number,
public index: number) {
super();
}

schedule(state?: any): VirtualAction<T> {
schedule(state?: any, delay: number = 0): VirtualAction<T> {
if (this.isUnsubscribed) {
return this;
}

this.state = state;
const scheduler = this.scheduler;
scheduler.actions.push(this);
var action = scheduler.frame === this.delay ? this :
new VirtualAction(scheduler, this.work, scheduler.index += 1);
action.state = state;
action.delay = scheduler.frame + delay;
scheduler.actions.push(action);
return this;
}

Expand Down
3 changes: 3 additions & 0 deletions src/util/isScheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export default function isScheduler(value: any): boolean {
return value && typeof value.schedule === 'function';
}

0 comments on commit af1cb68

Please sign in to comment.