From bb440ad42669dd99bf511a3d2e426573e3d3ff3e Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 1 Sep 2015 12:39:10 -0700 Subject: [PATCH] feat(operator): add timeout and timeoutWith adds two operators timeout and timeoutWith. The former is for sending errors on timeout the latter is for continuing with an Observable on timeout. closes #244 --- spec/operators/timeout-spec.js | 44 ++++++++++++++++++++++++++++++ spec/operators/timeoutWith-spec.js | 23 ++++++++++++++++ src/Observable.ts | 2 ++ src/Rx.ts | 4 +++ src/operators/timeout.ts | 39 ++++++++++++++++++++++++++ src/operators/timeoutWith.ts | 41 ++++++++++++++++++++++++++++ src/util/isDate.ts | 3 ++ 7 files changed, 156 insertions(+) create mode 100644 spec/operators/timeout-spec.js create mode 100644 spec/operators/timeoutWith-spec.js create mode 100644 src/operators/timeout.ts create mode 100644 src/operators/timeoutWith.ts create mode 100644 src/util/isDate.ts diff --git a/spec/operators/timeout-spec.js b/spec/operators/timeout-spec.js new file mode 100644 index 0000000000..185678e9dd --- /dev/null +++ b/spec/operators/timeout-spec.js @@ -0,0 +1,44 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.timeout', function () { + it('should timeout after a specified delay', function (done) { + Observable.never().timeout(100) + .subscribe(function (x) { + throw 'should not next'; + }, function (err) { + expect(err.message).toBe('timeout'); + done(); + }, function () { + throw 'should not complete'; + }); + }, 2000); + + it('should timeout after a delay and send the passed error', function (done) { + Observable.never().timeout(100, 'hello') + .subscribe(function () { + throw 'should not next'; + }, function (err) { + expect(err).toBe('hello'); + done(); + }, function () { + throw 'should not complete'; + }) + }); + + + it('should timeout at a specified Date', function (done) { + var date = new Date(Date.now() + 100); + + Observable.never().timeout(date) + .subscribe(function (x) { + throw 'should not next'; + }, function (err) { + expect(err.message).toBe('timeout'); + done(); + }, function () { + throw 'should not complete'; + }); + }, 2000); +}); \ No newline at end of file diff --git a/spec/operators/timeoutWith-spec.js b/spec/operators/timeoutWith-spec.js new file mode 100644 index 0000000000..ed418f2726 --- /dev/null +++ b/spec/operators/timeoutWith-spec.js @@ -0,0 +1,23 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.timeoutWith', function () { + it('should timeout after a specified delay then subscribe to the passed observable', function (done) { + var expected = [1, 2, 3]; + Observable.never().timeoutWith(100, Observable.of(1,2,3)) + .subscribe(function (x) { + expect(x).toBe(expected.shift()); + }, null, done); + }, 2000); + + + it('should timeout at a specified date then subscribe to the passed observable', function (done) { + var expected = [1, 2, 3]; + var date = new Date(Date.now() + 100); + Observable.never().timeoutWith(date, Observable.of(1,2,3)) + .subscribe(function (x) { + expect(x).toBe(expected.shift()); + }, null, done); + }, 2000); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index 894c426651..c4d0eb6da5 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -175,4 +175,6 @@ export default class Observable { bufferCount: (bufferSize: number, startBufferEvery: number) => Observable; finally: (ensure: () => void, thisArg?: any) => Observable; + timeout: (due: number|Date, errorToSend?: any, scheduler?: Scheduler) => Observable; + timeoutWith: (due: number|Date, withObservable: Observable, scheduler?: Scheduler) => Observable; } \ No newline at end of file diff --git a/src/Rx.ts b/src/Rx.ts index 782f01a2e9..51502b37f0 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -159,8 +159,12 @@ observableProto.retryWhen = retryWhen; observableProto.repeat = repeat; import _finally from './operators/finally'; +import timeout from './operators/timeout'; +import timeoutWith from './operators/timeoutWith'; observableProto.finally = _finally; +observableProto.timeout = timeout; +observableProto.timeoutWith = timeoutWith; import groupBy from './operators/groupBy'; import window from './operators/window'; diff --git a/src/operators/timeout.ts b/src/operators/timeout.ts new file mode 100644 index 0000000000..48c28e299a --- /dev/null +++ b/src/operators/timeout.ts @@ -0,0 +1,39 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Scheduler from '../Scheduler'; +import Subscription from '../Subscription'; +import isDate from '../util/isDate'; + +export default function timeout(due: number|Date, errorToSend: any = null, scheduler: Scheduler = Scheduler.immediate) { + let waitFor = isDate(due) ? (+due - Date.now()) : due; + return this.lift(new TimeoutOperator(waitFor, errorToSend, scheduler)); +} + +class TimeoutOperator implements Operator { + constructor(private waitFor: number, private errorToSend: any, private scheduler: Scheduler) { + } + + call(observer: Observer) { + return new TimeoutSubscriber(observer, this.waitFor, this.errorToSend, this.scheduler); + } +} + +class TimeoutSubscriber extends Subscriber { + timeoutSubscription: Subscription; + + constructor(destination: Observer, private waitFor: number, private errorToSend: any, private scheduler: Scheduler) { + super(destination); + let delay = waitFor; + scheduler.schedule(delay, { subscriber: this }, dispatchTimeout); + } + + sendTimeoutError() { + this.error(this.errorToSend || new Error('timeout')); + } +} + +function dispatchTimeout(state: { subscriber: TimeoutSubscriber }) { + const subscriber = state.subscriber; + subscriber.sendTimeoutError(); +} \ No newline at end of file diff --git a/src/operators/timeoutWith.ts b/src/operators/timeoutWith.ts new file mode 100644 index 0000000000..25902a9e21 --- /dev/null +++ b/src/operators/timeoutWith.ts @@ -0,0 +1,41 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Scheduler from '../Scheduler'; +import Subscription from '../Subscription'; +import Observable from '../Observable'; +import isDate from '../util/isDate'; + +export default function timeoutWith(due: number|Date, withObservable: Observable, scheduler: Scheduler = Scheduler.immediate) { + let waitFor = isDate(due) ? (+due - Date.now()) : due; + return this.lift(new TimeoutWithOperator(waitFor, withObservable, scheduler)); +} + +class TimeoutWithOperator implements Operator { + constructor(private waitFor: number, private withObservable: Observable, private scheduler: Scheduler) { + } + + call(observer: Observer) { + return new TimeoutWithSubscriber(observer, this.waitFor, this.withObservable, this.scheduler); + } +} + +class TimeoutWithSubscriber extends Subscriber { + timeoutSubscription: Subscription; + + constructor(destination: Observer, private waitFor: number, private withObservable: Observable, private scheduler: Scheduler) { + super(destination); + let delay = waitFor; + scheduler.schedule(delay, { subscriber: this }, dispatchTimeout); + } + + handleTimeout() { + const withObservable = this.withObservable; + this.add(withObservable.subscribe(this)); + } +} + +function dispatchTimeout(state: { subscriber: TimeoutWithSubscriber }) { + const subscriber = state.subscriber; + subscriber.handleTimeout(); +} \ No newline at end of file diff --git a/src/util/isDate.ts b/src/util/isDate.ts new file mode 100644 index 0000000000..d1825cdd3c --- /dev/null +++ b/src/util/isDate.ts @@ -0,0 +1,3 @@ +export default function isDate(value) { + return value instanceof Date && !isNaN(+value); +} \ No newline at end of file