-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(operator): add timeout and timeoutWith
adds two operators timeout and timeoutWith. The former is for sending errors on timeout the latter is for continuing with an Observable on timeout. closes #244
- Loading branch information
Showing
7 changed files
with
156 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
/* globals describe, it, expect */ | ||
var Rx = require('../../dist/cjs/Rx'); | ||
var Observable = Rx.Observable; | ||
|
||
describe('Observable.prototype.timeout', function () { | ||
it('should timeout after a specified delay', function (done) { | ||
Observable.never().timeout(100) | ||
.subscribe(function (x) { | ||
throw 'should not next'; | ||
}, function (err) { | ||
expect(err.message).toBe('timeout'); | ||
done(); | ||
}, function () { | ||
throw 'should not complete'; | ||
}); | ||
}, 2000); | ||
|
||
it('should timeout after a delay and send the passed error', function (done) { | ||
Observable.never().timeout(100, 'hello') | ||
.subscribe(function () { | ||
throw 'should not next'; | ||
}, function (err) { | ||
expect(err).toBe('hello'); | ||
done(); | ||
}, function () { | ||
throw 'should not complete'; | ||
}) | ||
}); | ||
|
||
|
||
it('should timeout at a specified Date', function (done) { | ||
var date = new Date(Date.now() + 100); | ||
|
||
Observable.never().timeout(date) | ||
.subscribe(function (x) { | ||
throw 'should not next'; | ||
}, function (err) { | ||
expect(err.message).toBe('timeout'); | ||
done(); | ||
}, function () { | ||
throw 'should not complete'; | ||
}); | ||
}, 2000); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
/* globals describe, it, expect */ | ||
var Rx = require('../../dist/cjs/Rx'); | ||
var Observable = Rx.Observable; | ||
|
||
describe('Observable.prototype.timeoutWith', function () { | ||
it('should timeout after a specified delay then subscribe to the passed observable', function (done) { | ||
var expected = [1, 2, 3]; | ||
Observable.never().timeoutWith(100, Observable.of(1,2,3)) | ||
.subscribe(function (x) { | ||
expect(x).toBe(expected.shift()); | ||
}, null, done); | ||
}, 2000); | ||
|
||
|
||
it('should timeout at a specified date then subscribe to the passed observable', function (done) { | ||
var expected = [1, 2, 3]; | ||
var date = new Date(Date.now() + 100); | ||
Observable.never().timeoutWith(date, Observable.of(1,2,3)) | ||
.subscribe(function (x) { | ||
expect(x).toBe(expected.shift()); | ||
}, null, done); | ||
}, 2000); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
import Operator from '../Operator'; | ||
import Observer from '../Observer'; | ||
import Subscriber from '../Subscriber'; | ||
import Scheduler from '../Scheduler'; | ||
import Subscription from '../Subscription'; | ||
import isDate from '../util/isDate'; | ||
|
||
export default function timeout(due: number|Date, errorToSend: any = null, scheduler: Scheduler = Scheduler.immediate) { | ||
let waitFor = isDate(due) ? (+due - Date.now()) : <number>due; | ||
return this.lift(new TimeoutOperator(waitFor, errorToSend, scheduler)); | ||
} | ||
|
||
class TimeoutOperator<T, R> implements Operator<T, R> { | ||
constructor(private waitFor: number, private errorToSend: any, private scheduler: Scheduler) { | ||
} | ||
|
||
call(observer: Observer<R>) { | ||
return new TimeoutSubscriber(observer, this.waitFor, this.errorToSend, this.scheduler); | ||
} | ||
} | ||
|
||
class TimeoutSubscriber<T> extends Subscriber<T> { | ||
timeoutSubscription: Subscription<any>; | ||
|
||
constructor(destination: Observer<T>, private waitFor: number, private errorToSend: any, private scheduler: Scheduler) { | ||
super(destination); | ||
let delay = waitFor; | ||
scheduler.schedule(delay, { subscriber: this }, dispatchTimeout); | ||
} | ||
|
||
sendTimeoutError() { | ||
this.error(this.errorToSend || new Error('timeout')); | ||
} | ||
} | ||
|
||
function dispatchTimeout<T>(state: { subscriber: TimeoutSubscriber<T> }) { | ||
const subscriber = state.subscriber; | ||
subscriber.sendTimeoutError(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
import Operator from '../Operator'; | ||
import Observer from '../Observer'; | ||
import Subscriber from '../Subscriber'; | ||
import Scheduler from '../Scheduler'; | ||
import Subscription from '../Subscription'; | ||
import Observable from '../Observable'; | ||
import isDate from '../util/isDate'; | ||
|
||
export default function timeoutWith(due: number|Date, withObservable: Observable<any>, scheduler: Scheduler = Scheduler.immediate) { | ||
let waitFor = isDate(due) ? (+due - Date.now()) : <number>due; | ||
return this.lift(new TimeoutWithOperator(waitFor, withObservable, scheduler)); | ||
} | ||
|
||
class TimeoutWithOperator<T, R> implements Operator<T, R> { | ||
constructor(private waitFor: number, private withObservable: Observable<any>, private scheduler: Scheduler) { | ||
} | ||
|
||
call(observer: Observer<R>) { | ||
return new TimeoutWithSubscriber(observer, this.waitFor, this.withObservable, this.scheduler); | ||
} | ||
} | ||
|
||
class TimeoutWithSubscriber<T> extends Subscriber<T> { | ||
timeoutSubscription: Subscription<any>; | ||
|
||
constructor(destination: Observer<T>, private waitFor: number, private withObservable: Observable<any>, private scheduler: Scheduler) { | ||
super(destination); | ||
let delay = waitFor; | ||
scheduler.schedule(delay, { subscriber: this }, dispatchTimeout); | ||
} | ||
|
||
handleTimeout() { | ||
const withObservable = this.withObservable; | ||
this.add(withObservable.subscribe(this)); | ||
} | ||
} | ||
|
||
function dispatchTimeout<T>(state: { subscriber: TimeoutWithSubscriber<T> }) { | ||
const subscriber = state.subscriber; | ||
subscriber.handleTimeout(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
export default function isDate(value) { | ||
return value instanceof Date && !isNaN(+value); | ||
} |