From 16bd691f2bbd076e4d4b5c4eaeed5307ed0ccd18 Mon Sep 17 00:00:00 2001 From: kwonoj Date: Wed, 30 Sep 2015 01:59:29 -0700 Subject: [PATCH] fix(timeout): update behavior of timeout, timeoutWith --- spec/operators/timeout-spec.js | 79 +++++++++------- spec/operators/timeoutWith-spec.js | 143 ++++++++++++++++++++++++++--- src/operators/timeout.ts | 66 ++++++++++--- src/operators/timeoutWith.ts | 80 ++++++++++++---- 4 files changed, 288 insertions(+), 80 deletions(-) diff --git a/spec/operators/timeout-spec.js b/spec/operators/timeout-spec.js index 185678e9dd..15ec279f54 100644 --- a/spec/operators/timeout-spec.js +++ b/spec/operators/timeout-spec.js @@ -1,44 +1,53 @@ -/* globals describe, it, expect */ +/* globals describe, it, expect, expectObservable, hot, rxTestScheduler */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; -describe('Observable.prototype.timeout', function () { - it('should timeout after a specified delay', function (done) { - Observable.never().timeout(100) - .subscribe(function (x) { - throw 'should not next'; - }, function (err) { - expect(err.message).toBe('timeout'); - done(); - }, function () { - throw 'should not complete'; - }); - }, 2000); +describe('Observable.prototype.timeout()', function () { + var defaultTimeoutError = new Error('timeout'); - it('should timeout after a delay and send the passed error', function (done) { - Observable.never().timeout(100, 'hello') - .subscribe(function () { - throw 'should not next'; - }, function (err) { - expect(err).toBe('hello'); - done(); - }, function () { - throw 'should not complete'; - }) + it('should timeout after a specified timeout period', function () { + var e1 = Observable.never(); + var expected = '-----#'; + + expectObservable(e1.timeout(50, null, rxTestScheduler)).toBe(expected, null, defaultTimeoutError); + }); + + it('should timeout after specified timeout period and send the passed error', function () { + var e1 = Observable.never(); + var expected = '-----#'; + var value = 'hello'; + + expectObservable(e1.timeout(50, value, rxTestScheduler)).toBe(expected, null, value); + }); + + it('should not timeout if source completes within absolute timeout period', function() { + var e1 = hot('--a--b--c--d--e--|'); + var expected = '--a--b--c--d--e--|'; + + var timeoutValue = new Date(Date.now() + (expected.length + 2) * 10); + + expectObservable(e1.timeout(timeoutValue, null, rxTestScheduler)).toBe(expected); }); + it('should not timeout if source emits within timeout period', function() { + var e1 = hot('--a--b--c--d--e--|'); + var expected = '--a--b--c--d--e--|'; + + expectObservable(e1.timeout(50, null, rxTestScheduler)).toBe(expected); + }); - it('should timeout at a specified Date', function (done) { - var date = new Date(Date.now() + 100); + it('should timeout after a specified timeout period between emit with default error while source emits', function () { + var e1 = hot('---a---b---c------d---e---|'); + var expected = '---a---b---c----#'; - Observable.never().timeout(date) - .subscribe(function (x) { - throw 'should not next'; - }, function (err) { - expect(err.message).toBe('timeout'); - done(); - }, function () { - throw 'should not complete'; - }); - }, 2000); + expectObservable(e1.timeout(50, null, rxTestScheduler)).toBe(expected, {a: 'a', b: 'b', c: 'c'}, defaultTimeoutError); + }); + + it('should timeout after a specified delay with passed error while source emits', function () { + var value = 'hello'; + var e1 = hot('---a---b---c------d---e---|'); + var expected = '---a---b---c----#'; + + expectObservable(e1.timeout(50, value, rxTestScheduler)).toBe(expected, {a: 'a', b: 'b', c: 'c'}, value); + }); }); \ No newline at end of file diff --git a/spec/operators/timeoutWith-spec.js b/spec/operators/timeoutWith-spec.js index ed418f2726..da51359442 100644 --- a/spec/operators/timeoutWith-spec.js +++ b/spec/operators/timeoutWith-spec.js @@ -1,23 +1,136 @@ -/* globals describe, it, expect */ +/* globals describe, it, expect, expectObservable, hot, cold, rxTestScheduler */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; -describe('Observable.prototype.timeoutWith', function () { - it('should timeout after a specified delay then subscribe to the passed observable', function (done) { - var expected = [1, 2, 3]; - Observable.never().timeoutWith(100, Observable.of(1,2,3)) - .subscribe(function (x) { - expect(x).toBe(expected.shift()); - }, null, done); +describe('Observable.prototype.timeoutWith()', function () { + it('should timeout after a specified period then subscribe to the passed observable', function () { + var e1 = Observable.never(); + var e2 = cold('--x--y--z--|'); + var expected = '-------x--y--z--|'; + + expectObservable(e1.timeoutWith(50, e2, rxTestScheduler)).toBe(expected); + }); + + it('should timeout at a specified date then subscribe to the passed observable', function (done) { + var expected = ['x', 'y', 'z']; + var e1 = Observable.never(); + var e2 = Observable.fromArray(expected); + + var res = []; + e1.timeoutWith(new Date(Date.now() + 100), e2) + .subscribe(function (x) { + res.push(x); + }, function(x) { + throw 'should not be called'; + }, function() { + expect(res).toEqual(expected); + done(); + }); }, 2000); + it('should timeout after a specified period between emit then subscribe to the passed observable when source emits', function () { + var e1 = hot('---a---b------c---|'); + var e2 = cold('-x-y-|'); + var expected = '---a---b----x-y-|'; + + expectObservable(e1.timeoutWith(40, e2, rxTestScheduler)).toBe(expected); + }); - it('should timeout at a specified date then subscribe to the passed observable', function (done) { - var expected = [1, 2, 3]; - var date = new Date(Date.now() + 100); - Observable.never().timeoutWith(date, Observable.of(1,2,3)) - .subscribe(function (x) { - expect(x).toBe(expected.shift()); - }, null, done); + it('should timeout after a specified period then subscribe to the passed observable when source is empty', function () { + var e1 = hot('-------------|'); + var e2 = cold('----x----|'); + var expected = '--------------x----|'; + + expectObservable(e1.timeoutWith(100, e2, rxTestScheduler)).toBe(expected); + }); + + it('should timeout after a specified period between emit then never completes if other source does not complete', function () { + var e1 = hot('--a--b--------c--d--|'); + var e2 = cold('-'); + var expected = '--a--b----'; + + expectObservable(e1.timeoutWith(40, e2, rxTestScheduler)).toBe(expected); + }); + + it('should timeout after a specified period then subscribe to the passed observable when source raises error after timeout', function () { + var e1 = hot('-------------#'); + var e2 = cold('----x----|'); + var expected = '--------------x----|'; + + expectObservable(e1.timeoutWith(100, e2, rxTestScheduler)).toBe(expected); + }); + + it('should timeout after a specified period between emit then never completes if other source emits but not complete', function () { + var e1 = hot('-------------|'); + var e2 = cold('----x----'); + var expected = '--------------x----'; + + expectObservable(e1.timeoutWith(100, e2, rxTestScheduler)).toBe(expected); + }); + + it('should not timeout if source completes within timeout period', function () { + var e1 = hot('-----|'); + var e2 = cold('----x----'); + var expected = '-----|'; + + expectObservable(e1.timeoutWith(100, e2, rxTestScheduler)).toBe(expected); + }); + + it('should not timeout if source raises error within timeout period', function () { + var e1 = hot('-----#'); + var e2 = cold('----x----|'); + var expected = '-----#'; + + expectObservable(e1.timeoutWith(100, e2, rxTestScheduler)).toBe(expected); + }); + + it('should not timeout if source emits within timeout period', function() { + var e1 = hot('--a--b--c--d--e--|'); + var e2 = cold('----x----|'); + var expected = '--a--b--c--d--e--|'; + + expectObservable(e1.timeoutWith(50, e2, rxTestScheduler)).toBe(expected); + }); + + it('should timeout after specified Date then subscribe to the passed observable', function(done) { + var e1 = Observable.interval(40).take(5); + var e2 = Observable.of(100); + + var res = []; + e1.timeoutWith(new Date(Date.now() + 100), e2) + .subscribe(function (x) { + res.push(x); + }, function(x) { + throw 'should not be called'; + }, function() { + expect(res).toEqual([0, 1, 100]); + done(); + }); }, 2000); + + it('should not timeout if source completes within specified Date', function() { + var e1 = hot('--a--b--c--d--e--|'); + var e2 = cold('--x--|'); + var expected = '--a--b--c--d--e--|'; + + var timeoutValue = new Date(Date.now() + (expected.length + 2) * 10); + + expectObservable(e1.timeoutWith(timeoutValue, e2, rxTestScheduler)).toBe(expected); + }); + + it('should not timeout if source raises error within specified Date', function() { + var e1 = hot('---a---#'); + var e2 = cold('--x--|'); + var expected = '---a---#'; + + expectObservable(e1.timeoutWith(new Date(Date.now() + 100), e2, rxTestScheduler)).toBe(expected); + }); + + it('should timeout specified Date after specified Date then never completes if other source does not complete', function() { + var e1 = hot('---a---b---c---d---e---|'); + var e2 = cold('-') + var expected = '---a---b--'; + + expectObservable(e1.timeoutWith(new Date(Date.now() + 100), e2, rxTestScheduler)).toBe(expected); + }); }); \ No newline at end of file diff --git a/src/operators/timeout.ts b/src/operators/timeout.ts index 341433971d..9d792f6149 100644 --- a/src/operators/timeout.ts +++ b/src/operators/timeout.ts @@ -7,34 +7,72 @@ import Subscription from '../Subscription'; import isDate from '../util/isDate'; export default function timeout(due: number|Date, errorToSend: any = null, scheduler: Scheduler = immediate) { - let waitFor = isDate(due) ? (+due - Date.now()) : due; - return this.lift(new TimeoutOperator(waitFor, errorToSend, scheduler)); + let absoluteTimeout = isDate(due); + let waitFor = absoluteTimeout ? (+due - Date.now()) : due; + + return this.lift(new TimeoutOperator(waitFor, absoluteTimeout, errorToSend, scheduler)); } class TimeoutOperator implements Operator { - constructor(private waitFor: number, private errorToSend: any, private scheduler: Scheduler) { + constructor(private waitFor: number, private absoluteTimeout:boolean, private errorToSend: any, private scheduler: Scheduler) { } call(subscriber: Subscriber) { - return new TimeoutSubscriber(subscriber, this.waitFor, this.errorToSend, this.scheduler); + return new TimeoutSubscriber(subscriber, this.absoluteTimeout, this.waitFor, this.errorToSend, this.scheduler); } } class TimeoutSubscriber extends Subscriber { - timeoutSubscription: Subscription; + private index: number = 0; + private _previousIndex: number = 0; + get previousIndex():number { + return this._previousIndex; + } + private _hasCompleted: boolean = false; + get hasCompleted(): boolean { + return this._hasCompleted; + } - constructor(destination: Subscriber, private waitFor: number, private errorToSend: any, private scheduler: Scheduler) { + constructor(destination: Subscriber, private absoluteTimeout:boolean, private waitFor: number, private errorToSend: any, private scheduler: Scheduler) { super(destination); - let delay = waitFor; - scheduler.schedule(dispatchTimeout, delay, { subscriber: this }); + this.scheduleTimeout(); + } + + private static dispatchTimeout(state: any): void{ + const source = state.subscriber; + const currentIndex = state.index; + + if (!source.completed && source.previousIndex === currentIndex) { + source.notifyTimeout(); + } + } + + private scheduleTimeout():void { + let currentIndex = this.index; + this.scheduler.schedule(TimeoutSubscriber.dispatchTimeout, this.waitFor, { subscriber: this, index: currentIndex }); + this.index++; + this._previousIndex = currentIndex; } - sendTimeoutError() { + _next(value: T) { + this.destination.next(value); + + if (!this.absoluteTimeout) { + this.scheduleTimeout(); + } + } + + _error(err) { + this.destination.error(err); + this._hasCompleted = true; + } + + _complete() { + this.destination.complete(); + this._hasCompleted = true; + } + + notifyTimeout() { this.error(this.errorToSend || new Error('timeout')); } -} - -function dispatchTimeout(state: { subscriber: TimeoutSubscriber }) { - const subscriber = state.subscriber; - subscriber.sendTimeoutError(); } \ No newline at end of file diff --git a/src/operators/timeoutWith.ts b/src/operators/timeoutWith.ts index 7cb38ca929..c4f5902002 100644 --- a/src/operators/timeoutWith.ts +++ b/src/operators/timeoutWith.ts @@ -6,37 +6,85 @@ import immediate from '../schedulers/immediate'; import Subscription from '../Subscription'; import Observable from '../Observable'; import isDate from '../util/isDate'; +import OuterSubscriber from '../OuterSubscriber'; +import subscribeToResult from '../util/subscribeToResult'; export default function timeoutWith(due: number|Date, withObservable: Observable, scheduler: Scheduler = immediate) { - let waitFor = isDate(due) ? (+due - Date.now()) : due; - return this.lift(new TimeoutWithOperator(waitFor, withObservable, scheduler)); + let absoluteTimeout = isDate(due); + let waitFor = absoluteTimeout ? (+due - Date.now()) : due; + + return this.lift(new TimeoutWithOperator(waitFor, absoluteTimeout, withObservable, scheduler)); } class TimeoutWithOperator implements Operator { - constructor(private waitFor: number, private withObservable: Observable, private scheduler: Scheduler) { + constructor(private waitFor: number, private absoluteTimeout:boolean, private withObservable: Observable, private scheduler: Scheduler) { } call(subscriber: Subscriber) { - return new TimeoutWithSubscriber(subscriber, this.waitFor, this.withObservable, this.scheduler); + return new TimeoutWithSubscriber(subscriber, this.absoluteTimeout, this.waitFor, this.withObservable, this.scheduler); } } -class TimeoutWithSubscriber extends Subscriber { - timeoutSubscription: Subscription; +class TimeoutWithSubscriber extends OuterSubscriber { + private timeoutSubscription: Subscription = undefined; + private timedOut: boolean = false; + private index: number = 0; + private _previousIndex: number = 0; + get previousIndex():number { + return this._previousIndex; + } + private _hasCompleted: boolean = false; + get hasCompleted(): boolean { + return this._hasCompleted; + } - constructor(destination: Subscriber, private waitFor: number, private withObservable: Observable, private scheduler: Scheduler) { + constructor(destination: Subscriber, private absoluteTimeout:boolean, private waitFor: number, private withObservable: Observable, private scheduler: Scheduler) { super(destination); - let delay = waitFor; - scheduler.schedule(dispatchTimeout, delay, { subscriber: this }); + this.scheduleTimeout(); } - handleTimeout() { - const withObservable = this.withObservable; - this.add(withObservable.subscribe(this)); + private static dispatchTimeout(state: any): void{ + const source = state.subscriber; + const currentIndex = state.index; + + if (!source.completed && source.previousIndex === currentIndex) { + source.handleTimeout(); + } + } + + private scheduleTimeout():void { + let currentIndex = this.index; + this.scheduler.schedule(TimeoutWithSubscriber.dispatchTimeout, this.waitFor, { subscriber: this, index: currentIndex }); + this.index++; + this._previousIndex = currentIndex; } -} -function dispatchTimeout(state: { subscriber: TimeoutWithSubscriber }) { - const subscriber = state.subscriber; - subscriber.handleTimeout(); + _next(value: T) { + if (!this.timedOut) { + this.destination.next(value); + if (!this.absoluteTimeout) { + this.scheduleTimeout(); + } + } + } + + _error(err) { + if (!this.timedOut) { + this.destination.error(err); + this._hasCompleted = true; + } + } + + _complete() { + if (!this.timedOut) { + this.destination.complete(); + this._hasCompleted = true; + } + } + + handleTimeout(): void { + const withObservable = this.withObservable; + this.timedOut = true; + this.add(this.timeoutSubscription = subscribeToResult(this, withObservable)); + } } \ No newline at end of file