Skip to content

Commit

Permalink
fix(timeout): update behavior of timeout, timeoutWith
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj authored and benlesh committed Oct 2, 2015
1 parent d5120d2 commit 16bd691
Show file tree
Hide file tree
Showing 4 changed files with 288 additions and 80 deletions.
79 changes: 44 additions & 35 deletions spec/operators/timeout-spec.js
Original file line number Diff line number Diff line change
@@ -1,44 +1,53 @@
/* globals describe, it, expect */
/* globals describe, it, expect, expectObservable, hot, rxTestScheduler */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.timeout', function () {
it('should timeout after a specified delay', function (done) {
Observable.never().timeout(100)
.subscribe(function (x) {
throw 'should not next';
}, function (err) {
expect(err.message).toBe('timeout');
done();
}, function () {
throw 'should not complete';
});
}, 2000);
describe('Observable.prototype.timeout()', function () {
var defaultTimeoutError = new Error('timeout');

it('should timeout after a delay and send the passed error', function (done) {
Observable.never().timeout(100, 'hello')
.subscribe(function () {
throw 'should not next';
}, function (err) {
expect(err).toBe('hello');
done();
}, function () {
throw 'should not complete';
})
it('should timeout after a specified timeout period', function () {
var e1 = Observable.never();
var expected = '-----#';

expectObservable(e1.timeout(50, null, rxTestScheduler)).toBe(expected, null, defaultTimeoutError);
});

it('should timeout after specified timeout period and send the passed error', function () {
var e1 = Observable.never();
var expected = '-----#';
var value = 'hello';

expectObservable(e1.timeout(50, value, rxTestScheduler)).toBe(expected, null, value);
});

it('should not timeout if source completes within absolute timeout period', function() {
var e1 = hot('--a--b--c--d--e--|');
var expected = '--a--b--c--d--e--|';

var timeoutValue = new Date(Date.now() + (expected.length + 2) * 10);

expectObservable(e1.timeout(timeoutValue, null, rxTestScheduler)).toBe(expected);
});

it('should not timeout if source emits within timeout period', function() {
var e1 = hot('--a--b--c--d--e--|');
var expected = '--a--b--c--d--e--|';

expectObservable(e1.timeout(50, null, rxTestScheduler)).toBe(expected);
});

it('should timeout at a specified Date', function (done) {
var date = new Date(Date.now() + 100);
it('should timeout after a specified timeout period between emit with default error while source emits', function () {
var e1 = hot('---a---b---c------d---e---|');
var expected = '---a---b---c----#';

Observable.never().timeout(date)
.subscribe(function (x) {
throw 'should not next';
}, function (err) {
expect(err.message).toBe('timeout');
done();
}, function () {
throw 'should not complete';
});
}, 2000);
expectObservable(e1.timeout(50, null, rxTestScheduler)).toBe(expected, {a: 'a', b: 'b', c: 'c'}, defaultTimeoutError);
});

it('should timeout after a specified delay with passed error while source emits', function () {
var value = 'hello';
var e1 = hot('---a---b---c------d---e---|');
var expected = '---a---b---c----#';

expectObservable(e1.timeout(50, value, rxTestScheduler)).toBe(expected, {a: 'a', b: 'b', c: 'c'}, value);
});
});
143 changes: 128 additions & 15 deletions spec/operators/timeoutWith-spec.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,136 @@
/* globals describe, it, expect */
/* globals describe, it, expect, expectObservable, hot, cold, rxTestScheduler */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.timeoutWith', function () {
it('should timeout after a specified delay then subscribe to the passed observable', function (done) {
var expected = [1, 2, 3];
Observable.never().timeoutWith(100, Observable.of(1,2,3))
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, null, done);
describe('Observable.prototype.timeoutWith()', function () {
it('should timeout after a specified period then subscribe to the passed observable', function () {
var e1 = Observable.never();
var e2 = cold('--x--y--z--|');
var expected = '-------x--y--z--|';

expectObservable(e1.timeoutWith(50, e2, rxTestScheduler)).toBe(expected);
});

it('should timeout at a specified date then subscribe to the passed observable', function (done) {
var expected = ['x', 'y', 'z'];
var e1 = Observable.never();
var e2 = Observable.fromArray(expected);

var res = [];
e1.timeoutWith(new Date(Date.now() + 100), e2)
.subscribe(function (x) {
res.push(x);
}, function(x) {
throw 'should not be called';
}, function() {
expect(res).toEqual(expected);
done();
});
}, 2000);

it('should timeout after a specified period between emit then subscribe to the passed observable when source emits', function () {
var e1 = hot('---a---b------c---|');
var e2 = cold('-x-y-|');
var expected = '---a---b----x-y-|';

expectObservable(e1.timeoutWith(40, e2, rxTestScheduler)).toBe(expected);
});

it('should timeout at a specified date then subscribe to the passed observable', function (done) {
var expected = [1, 2, 3];
var date = new Date(Date.now() + 100);
Observable.never().timeoutWith(date, Observable.of(1,2,3))
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, null, done);
it('should timeout after a specified period then subscribe to the passed observable when source is empty', function () {
var e1 = hot('-------------|');
var e2 = cold('----x----|');
var expected = '--------------x----|';

expectObservable(e1.timeoutWith(100, e2, rxTestScheduler)).toBe(expected);
});

it('should timeout after a specified period between emit then never completes if other source does not complete', function () {
var e1 = hot('--a--b--------c--d--|');
var e2 = cold('-');
var expected = '--a--b----';

expectObservable(e1.timeoutWith(40, e2, rxTestScheduler)).toBe(expected);
});

it('should timeout after a specified period then subscribe to the passed observable when source raises error after timeout', function () {
var e1 = hot('-------------#');
var e2 = cold('----x----|');
var expected = '--------------x----|';

expectObservable(e1.timeoutWith(100, e2, rxTestScheduler)).toBe(expected);
});

it('should timeout after a specified period between emit then never completes if other source emits but not complete', function () {
var e1 = hot('-------------|');
var e2 = cold('----x----');
var expected = '--------------x----';

expectObservable(e1.timeoutWith(100, e2, rxTestScheduler)).toBe(expected);
});

it('should not timeout if source completes within timeout period', function () {
var e1 = hot('-----|');
var e2 = cold('----x----');
var expected = '-----|';

expectObservable(e1.timeoutWith(100, e2, rxTestScheduler)).toBe(expected);
});

it('should not timeout if source raises error within timeout period', function () {
var e1 = hot('-----#');
var e2 = cold('----x----|');
var expected = '-----#';

expectObservable(e1.timeoutWith(100, e2, rxTestScheduler)).toBe(expected);
});

it('should not timeout if source emits within timeout period', function() {
var e1 = hot('--a--b--c--d--e--|');
var e2 = cold('----x----|');
var expected = '--a--b--c--d--e--|';

expectObservable(e1.timeoutWith(50, e2, rxTestScheduler)).toBe(expected);
});

it('should timeout after specified Date then subscribe to the passed observable', function(done) {
var e1 = Observable.interval(40).take(5);
var e2 = Observable.of(100);

var res = [];
e1.timeoutWith(new Date(Date.now() + 100), e2)
.subscribe(function (x) {
res.push(x);
}, function(x) {
throw 'should not be called';
}, function() {
expect(res).toEqual([0, 1, 100]);
done();
});
}, 2000);

it('should not timeout if source completes within specified Date', function() {
var e1 = hot('--a--b--c--d--e--|');
var e2 = cold('--x--|');
var expected = '--a--b--c--d--e--|';

var timeoutValue = new Date(Date.now() + (expected.length + 2) * 10);

expectObservable(e1.timeoutWith(timeoutValue, e2, rxTestScheduler)).toBe(expected);
});

it('should not timeout if source raises error within specified Date', function() {
var e1 = hot('---a---#');
var e2 = cold('--x--|');
var expected = '---a---#';

expectObservable(e1.timeoutWith(new Date(Date.now() + 100), e2, rxTestScheduler)).toBe(expected);
});

it('should timeout specified Date after specified Date then never completes if other source does not complete', function() {
var e1 = hot('---a---b---c---d---e---|');
var e2 = cold('-')
var expected = '---a---b--';

expectObservable(e1.timeoutWith(new Date(Date.now() + 100), e2, rxTestScheduler)).toBe(expected);
});
});
66 changes: 52 additions & 14 deletions src/operators/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,72 @@ import Subscription from '../Subscription';
import isDate from '../util/isDate';

export default function timeout(due: number|Date, errorToSend: any = null, scheduler: Scheduler = immediate) {
let waitFor = isDate(due) ? (+due - Date.now()) : <number>due;
return this.lift(new TimeoutOperator(waitFor, errorToSend, scheduler));
let absoluteTimeout = isDate(due);
let waitFor = absoluteTimeout ? (+due - Date.now()) : <number>due;

return this.lift(new TimeoutOperator(waitFor, absoluteTimeout, errorToSend, scheduler));
}

class TimeoutOperator<T, R> implements Operator<T, R> {
constructor(private waitFor: number, private errorToSend: any, private scheduler: Scheduler) {
constructor(private waitFor: number, private absoluteTimeout:boolean, private errorToSend: any, private scheduler: Scheduler) {
}

call(subscriber: Subscriber<R>) {
return new TimeoutSubscriber(subscriber, this.waitFor, this.errorToSend, this.scheduler);
return new TimeoutSubscriber(subscriber, this.absoluteTimeout, this.waitFor, this.errorToSend, this.scheduler);
}
}

class TimeoutSubscriber<T> extends Subscriber<T> {
timeoutSubscription: Subscription<any>;
private index: number = 0;
private _previousIndex: number = 0;
get previousIndex():number {
return this._previousIndex;
}
private _hasCompleted: boolean = false;
get hasCompleted(): boolean {
return this._hasCompleted;
}

constructor(destination: Subscriber<T>, private waitFor: number, private errorToSend: any, private scheduler: Scheduler) {
constructor(destination: Subscriber<T>, private absoluteTimeout:boolean, private waitFor: number, private errorToSend: any, private scheduler: Scheduler) {
super(destination);
let delay = waitFor;
scheduler.schedule(dispatchTimeout, delay, { subscriber: this });
this.scheduleTimeout();
}

private static dispatchTimeout(state: any): void{
const source = state.subscriber;
const currentIndex = state.index;

if (!source.completed && source.previousIndex === currentIndex) {
source.notifyTimeout();
}
}

private scheduleTimeout():void {
let currentIndex = this.index;
this.scheduler.schedule(TimeoutSubscriber.dispatchTimeout, this.waitFor, { subscriber: this, index: currentIndex });
this.index++;
this._previousIndex = currentIndex;
}

sendTimeoutError() {
_next(value: T) {
this.destination.next(value);

if (!this.absoluteTimeout) {
this.scheduleTimeout();
}
}

_error(err) {
this.destination.error(err);
this._hasCompleted = true;
}

_complete() {
this.destination.complete();
this._hasCompleted = true;
}

notifyTimeout() {
this.error(this.errorToSend || new Error('timeout'));
}
}

function dispatchTimeout<T>(state: { subscriber: TimeoutSubscriber<T> }) {
const subscriber = state.subscriber;
subscriber.sendTimeoutError();
}
Loading

0 comments on commit 16bd691

Please sign in to comment.