Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(async): adds AsyncScheduler and updates appropriate operators. #1395

Merged
merged 3 commits into from
Mar 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spec/observables/interval-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ describe('Observable.interval', () => {
it('should specify default scheduler if incorrect scheduler specified', () => {
const scheduler = (<any>Observable.interval(10, <any>jasmine.createSpy('dummy'))).scheduler;

expect(scheduler).toBe(Rx.Scheduler.asap);
expect(scheduler).toBe(Rx.Scheduler.async);
});

it('should emit when relative interval set to zero', () => {
Expand Down
3 changes: 3 additions & 0 deletions src/Rx.DOM.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,11 @@ import {EmptyError} from './util/EmptyError';
import {ArgumentOutOfRangeError} from './util/ArgumentOutOfRangeError';
import {ObjectUnsubscribedError} from './util/ObjectUnsubscribedError';
import {asap} from './scheduler/asap';
import {async} from './scheduler/async';
import {queue} from './scheduler/queue';
import {animationFrame} from './scheduler/animationFrame';
import {AsapScheduler} from './scheduler/AsapScheduler';
import {AsyncScheduler} from './scheduler/AsyncScheduler';
import {QueueScheduler} from './scheduler/QueueScheduler';
import {AnimationFrameScheduler} from './scheduler/AnimationFrameScheduler';
import {rxSubscriber} from './symbol/rxSubscriber';
Expand All @@ -136,6 +138,7 @@ import {AjaxRequest, AjaxResponse, AjaxError, AjaxTimeoutError} from './observab
/* tslint:disable:no-var-keyword */
var Scheduler = {
asap,
async,
queue,
animationFrame
};
Expand Down
3 changes: 3 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,10 @@ import {EmptyError} from './util/EmptyError';
import {ObjectUnsubscribedError} from './util/ObjectUnsubscribedError';
import {ArgumentOutOfRangeError} from './util/ArgumentOutOfRangeError';
import {asap} from './scheduler/asap';
import {async} from './scheduler/async';
import {queue} from './scheduler/queue';
import {AsapScheduler} from './scheduler/AsapScheduler';
import {AsyncScheduler} from './scheduler/AsyncScheduler';
import {QueueScheduler} from './scheduler/QueueScheduler';
import {TimeInterval} from './operator/timeInterval';
import {TestScheduler} from './testing/TestScheduler';
Expand All @@ -184,6 +186,7 @@ import {rxSubscriber} from './symbol/rxSubscriber';
/* tslint:disable:no-var-keyword */
var Scheduler = {
asap,
async,
queue
};

Expand Down
3 changes: 3 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,18 @@ import {EmptyError} from './util/EmptyError';
import {ArgumentOutOfRangeError} from './util/ArgumentOutOfRangeError';
import {ObjectUnsubscribedError} from './util/ObjectUnsubscribedError';
import {asap} from './scheduler/asap';
import {async} from './scheduler/async';
import {queue} from './scheduler/queue';
import {AsapScheduler} from './scheduler/AsapScheduler';
import {AsyncScheduler} from './scheduler/AsyncScheduler';
import {QueueScheduler} from './scheduler/QueueScheduler';
import {rxSubscriber} from './symbol/rxSubscriber';
/* tslint:enable:no-unused-variable */

/* tslint:disable:no-var-keyword */
var Scheduler = {
asap,
async,
queue
};

Expand Down
8 changes: 4 additions & 4 deletions src/observable/IntervalObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {Subscriber} from '../Subscriber';
import {isNumeric} from '../util/isNumeric';
import {Scheduler} from '../Scheduler';
import {Observable} from '../Observable';
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';

/**
*
Expand All @@ -16,7 +16,7 @@ export class IntervalObservable extends Observable<number> {
* @name interval
* @owner Observable
*/
static create(period: number = 0, scheduler: Scheduler = asap): Observable<number> {
static create(period: number = 0, scheduler: Scheduler = async): Observable<number> {
return new IntervalObservable(period, scheduler);
}

Expand All @@ -34,13 +34,13 @@ export class IntervalObservable extends Observable<number> {
(<any> this).schedule(state, period);
}

constructor(private period: number = 0, private scheduler: Scheduler = asap) {
constructor(private period: number = 0, private scheduler: Scheduler = async) {
super();
if (!isNumeric(period) || period < 0) {
this.period = 0;
}
if (!scheduler || typeof scheduler.schedule !== 'function') {
this.scheduler = asap;
this.scheduler = async;
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/observable/TimerObservable.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {isNumeric} from '../util/isNumeric';
import {Scheduler} from '../Scheduler';
import {Observable} from '../Observable';
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';
import {isScheduler} from '../util/isScheduler';
import {isDate} from '../util/isDate';
import {Subscription} from '../Subscription';
Expand Down Expand Up @@ -58,7 +58,7 @@ export class TimerObservable extends Observable<number> {
}

if (!isScheduler(scheduler)) {
scheduler = asap;
scheduler = async;
}

this.scheduler = scheduler;
Expand Down
6 changes: 3 additions & 3 deletions src/operator/bufferTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {Subscriber} from '../Subscriber';
import {Observable} from '../Observable';
import {Scheduler} from '../Scheduler';
import {Action} from '../scheduler/Action';
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';

/**
* Buffers values from the source for a specific time period. Optionally allows
Expand All @@ -15,7 +15,7 @@ import {asap} from '../scheduler/asap';
* before emitting them and clearing them.
* @param {number} [bufferCreationInterval] the interval at which to start new
* buffers.
* @param {Scheduler} [scheduler] (optional, defaults to `asap` scheduler) The
* @param {Scheduler} [scheduler] (optional, defaults to `async` scheduler) The
* scheduler on which to schedule the intervals that determine buffer
* boundaries.
* @return {Observable<T[]>} an observable of arrays of buffered values.
Expand All @@ -24,7 +24,7 @@ import {asap} from '../scheduler/asap';
*/
export function bufferTime<T>(bufferTimeSpan: number,
bufferCreationInterval: number = null,
scheduler: Scheduler = asap): Observable<T[]> {
scheduler: Scheduler = async): Observable<T[]> {
return this.lift(new BufferTimeOperator<T>(bufferTimeSpan, bufferCreationInterval, scheduler));
}

Expand Down
4 changes: 2 additions & 2 deletions src/operator/debounceTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Scheduler} from '../Scheduler';
import {Subscription} from '../Subscription';
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';

/**
* Returns the source Observable delayed by the computed debounce duration,
Expand All @@ -19,7 +19,7 @@ import {asap} from '../scheduler/asap';
* @method debounceTime
* @owner Observable
*/
export function debounceTime<T>(dueTime: number, scheduler: Scheduler = asap): Observable<T> {
export function debounceTime<T>(dueTime: number, scheduler: Scheduler = async): Observable<T> {
return this.lift(new DebounceTimeOperator(dueTime, scheduler));
}

Expand Down
4 changes: 2 additions & 2 deletions src/operator/delay.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';
import {isDate} from '../util/isDate';
import {Operator} from '../Operator';
import {Scheduler} from '../Scheduler';
Expand All @@ -16,7 +16,7 @@ import {Observable} from '../Observable';
* @owner Observable
*/
export function delay<T>(delay: number|Date,
scheduler: Scheduler = asap): Observable<T> {
scheduler: Scheduler = async): Observable<T> {
const absoluteDelay = isDate(delay);
const delayFor = absoluteDelay ? (+delay - scheduler.now()) : Math.abs(<number>delay);
return this.lift(new DelayOperator(delayFor, scheduler));
Expand Down
4 changes: 2 additions & 2 deletions src/operator/inspectTime.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';
import {Operator} from '../Operator';
import {Scheduler} from '../Scheduler';
import {Subscriber} from '../Subscriber';
Expand All @@ -12,7 +12,7 @@ import {Subscription} from '../Subscription';
* @method inspectTime
* @owner Observable
*/
export function inspectTime<T>(delay: number, scheduler: Scheduler = asap): Observable<T> {
export function inspectTime<T>(delay: number, scheduler: Scheduler = async): Observable<T> {
return this.lift(new InspectTimeOperator(delay, scheduler));
}

Expand Down
4 changes: 2 additions & 2 deletions src/operator/sampleTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {Observable} from '../Observable';
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Scheduler} from '../Scheduler';
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';

/**
* @param delay
Expand All @@ -11,7 +11,7 @@ import {asap} from '../scheduler/asap';
* @method sampleTime
* @owner Observable
*/
export function sampleTime<T>(delay: number, scheduler: Scheduler = asap): Observable<T> {
export function sampleTime<T>(delay: number, scheduler: Scheduler = async): Observable<T> {
return this.lift(new SampleTimeOperator(delay, scheduler));
}

Expand Down
4 changes: 2 additions & 2 deletions src/operator/throttleTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Scheduler} from '../Scheduler';
import {Subscription} from '../Subscription';
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';
import {Observable} from '../Observable';

/**
Expand All @@ -12,7 +12,7 @@ import {Observable} from '../Observable';
* @method throttleTime
* @owner Observable
*/
export function throttleTime<T>(delay: number, scheduler: Scheduler = asap): Observable<T> {
export function throttleTime<T>(delay: number, scheduler: Scheduler = async): Observable<T> {
return this.lift(new ThrottleTimeOperator(delay, scheduler));
}

Expand Down
4 changes: 2 additions & 2 deletions src/operator/timeInterval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ import {Operator} from '../Operator';
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Scheduler} from '../Scheduler';
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';

/**
* @param scheduler
* @return {Observable<TimeInterval<any>>|WebSocketSubject<T>|Observable<T>}
* @method timeInterval
* @owner Observable
*/
export function timeInterval<T>(scheduler: Scheduler = asap): Observable<TimeInterval<T>> {
export function timeInterval<T>(scheduler: Scheduler = async): Observable<TimeInterval<T>> {
return this.lift(new TimeIntervalOperator(scheduler));
}

Expand Down
4 changes: 2 additions & 2 deletions src/operator/timeout.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';
import {isDate} from '../util/isDate';
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
Expand All @@ -15,7 +15,7 @@ import {Observable} from '../Observable';
*/
export function timeout<T>(due: number | Date,
errorToSend: any = null,
scheduler: Scheduler = asap): Observable<T> {
scheduler: Scheduler = async): Observable<T> {
let absoluteTimeout = isDate(due);
let waitFor = absoluteTimeout ? (+due - scheduler.now()) : Math.abs(<number>due);
return this.lift(new TimeoutOperator(waitFor, absoluteTimeout, errorToSend, scheduler));
Expand Down
4 changes: 2 additions & 2 deletions src/operator/timeoutWith.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Scheduler} from '../Scheduler';
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';
import {Subscription} from '../Subscription';
import {Observable} from '../Observable';
import {isDate} from '../util/isDate';
Expand All @@ -18,7 +18,7 @@ import {subscribeToResult} from '../util/subscribeToResult';
*/
export function timeoutWith<T, R>(due: number | Date,
withObservable: Observable<R>,
scheduler: Scheduler = asap): Observable<T | R> {
scheduler: Scheduler = async): Observable<T | R> {
let absoluteTimeout = isDate(due);
let waitFor = absoluteTimeout ? (+due - scheduler.now()) : Math.abs(<number>due);
return this.lift(new TimeoutWithOperator(waitFor, absoluteTimeout, withObservable, scheduler));
Expand Down
4 changes: 2 additions & 2 deletions src/operator/windowTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {Observable} from '../Observable';
import {Subject} from '../Subject';
import {Scheduler} from '../Scheduler';
import {Action} from '../scheduler/Action';
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';

/**
* @param windowTimeSpan
Expand All @@ -16,7 +16,7 @@ import {asap} from '../scheduler/asap';
*/
export function windowTime<T>(windowTimeSpan: number,
windowCreationInterval: number = null,
scheduler: Scheduler = asap): Observable<Observable<T>> {
scheduler: Scheduler = async): Observable<Observable<T>> {
return this.lift(new WindowTimeOperator<T>(windowTimeSpan, windowCreationInterval, scheduler));
}

Expand Down
10 changes: 10 additions & 0 deletions src/scheduler/AsyncScheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import {Action} from './Action';
import {FutureAction} from './FutureAction';
import {Subscription} from '../Subscription';
import {QueueScheduler} from './QueueScheduler';

export class AsyncScheduler extends QueueScheduler {
scheduleNow<T>(work: (x?: any) => Subscription, state?: any): Action {
return new FutureAction(this, work).schedule(state, 0);
}
}
Loading