-
-
Notifications
You must be signed in to change notification settings - Fork 139
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(throttle): add throttle extra operator
- Loading branch information
Showing
2 changed files
with
140 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
import {Operator, Stream} from '../core'; | ||
|
||
class ThrottleOperator<T> implements Operator<T, T> { | ||
public type = 'throttle'; | ||
public out: Stream<T> = null; | ||
private value: T = null; | ||
private id: any = null; | ||
|
||
constructor(public dt: number, | ||
public ins: Stream<T>) { | ||
} | ||
|
||
_start(out: Stream<T>): void { | ||
this.out = out; | ||
this.ins._add(this); | ||
} | ||
|
||
_stop(): void { | ||
this.ins._remove(this); | ||
this.out = null; | ||
this.value = null; | ||
this.id = null; | ||
} | ||
|
||
clearInterval() { | ||
const id = this.id; | ||
if (id !== null) { | ||
clearInterval(id); | ||
} | ||
this.id = null; | ||
} | ||
|
||
_n(t: T) { | ||
const u = this.out; | ||
if (!u) return; | ||
this.value = t; | ||
if (this.id) return; | ||
u._n(t); | ||
this.id = setInterval(() => { | ||
this.clearInterval(); | ||
}, this.dt); | ||
} | ||
|
||
_e(err: any) { | ||
const u = this.out; | ||
if (!u) return; | ||
this.clearInterval(); | ||
u._e(err); | ||
} | ||
|
||
_c() { | ||
const u = this.out; | ||
if (!u) return; | ||
this.clearInterval(); | ||
u._c(); | ||
} | ||
} | ||
|
||
/** | ||
* Emits event and drops subsequent events until a certain amount of silence has passed. | ||
* | ||
* Marble diagram: | ||
* | ||
* ```text | ||
* --1-2-----3--4----5| | ||
* throttle(60) | ||
* --1-------3-------5-| | ||
* ``` | ||
* | ||
* Example: | ||
* | ||
* ```js | ||
* import fromDiagram from 'xstream/extra/fromDiagram' | ||
* import throttle from 'xstream/extra/throttle' | ||
* | ||
* const stream = fromDiagram(--1-2-----3--4----5|) | ||
* .compose(throttle(60)) | ||
* | ||
* stream.addListener({ | ||
* next: i => console.log(i), | ||
* error: err => console.error(err), | ||
* complete: () => console.log('completed') | ||
* }) | ||
* ``` | ||
* | ||
* ```text | ||
* > 1 | ||
* > 3 | ||
* > 5 | ||
* > completed | ||
* ``` | ||
* | ||
* @param {number} period The amount of silence required in milliseconds. | ||
* @return {Stream} | ||
*/ | ||
export default function throttle(period: number): <T>(ins: Stream<T>) => Stream<T> { | ||
return function throttleOperator<T>(ins: Stream<T>) { | ||
return new Stream<T>(new ThrottleOperator(period, ins)); | ||
}; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/// <reference path="../../typings/globals/mocha/index.d.ts" /> | ||
/// <reference path="../../typings/globals/node/index.d.ts" /> | ||
import xs, {Listener, Producer} from '../../src/index'; | ||
import throttle from '../../src/extra/throttle'; | ||
import * as assert from 'assert'; | ||
|
||
describe('throttle (extra)', () => { | ||
it('should emit event and drop subsequent events until a period of silence has passed', (done) => { | ||
const producer: Producer<number> = { | ||
start(out: Listener<number>) { | ||
out.next(1); | ||
out.next(2); | ||
setTimeout(() => { | ||
out.next(5); | ||
setTimeout(() => { | ||
out.next(7); | ||
setTimeout(() => { | ||
out.next(9); | ||
}, 150) | ||
}, 20) | ||
}, 200) | ||
}, | ||
stop() { } | ||
}; | ||
const stream = xs.create(producer).compose(throttle(100)); | ||
const expected = [1, 5, 9]; | ||
let listener = { | ||
next: (x: number) => { | ||
assert.equal(x, expected.shift()); | ||
if (expected.length === 0) { | ||
stream.removeListener(listener); | ||
done(); | ||
} | ||
}, | ||
error: (err: Error) => done(err), | ||
complete: () => done(new Error('This should not be called')), | ||
}; | ||
stream.addListener(listener); | ||
}); | ||
}); |