Skip to content

Commit

Permalink
feat(throttle): add throttle extra operator
Browse files Browse the repository at this point in the history
  • Loading branch information
wclr authored and staltz committed Oct 17, 2016
1 parent c96ff10 commit 8b5c211
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 0 deletions.
100 changes: 100 additions & 0 deletions src/extra/throttle.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import {Operator, Stream} from '../core';

class ThrottleOperator<T> implements Operator<T, T> {
public type = 'throttle';
public out: Stream<T> = null;
private value: T = null;
private id: any = null;

constructor(public dt: number,
public ins: Stream<T>) {
}

_start(out: Stream<T>): void {
this.out = out;
this.ins._add(this);
}

_stop(): void {
this.ins._remove(this);
this.out = null;
this.value = null;
this.id = null;
}

clearInterval() {
const id = this.id;
if (id !== null) {
clearInterval(id);
}
this.id = null;
}

_n(t: T) {
const u = this.out;
if (!u) return;
this.value = t;
if (this.id) return;
u._n(t);
this.id = setInterval(() => {
this.clearInterval();
}, this.dt);
}

_e(err: any) {
const u = this.out;
if (!u) return;
this.clearInterval();
u._e(err);
}

_c() {
const u = this.out;
if (!u) return;
this.clearInterval();
u._c();
}
}

/**
* Emits event and drops subsequent events until a certain amount of silence has passed.
*
* Marble diagram:
*
* ```text
* --1-2-----3--4----5|
* throttle(60)
* --1-------3-------5-|
* ```
*
* Example:
*
* ```js
* import fromDiagram from 'xstream/extra/fromDiagram'
* import throttle from 'xstream/extra/throttle'
*
* const stream = fromDiagram(--1-2-----3--4----5|)
* .compose(throttle(60))
*
* stream.addListener({
* next: i => console.log(i),
* error: err => console.error(err),
* complete: () => console.log('completed')
* })
* ```
*
* ```text
* > 1
* > 3
* > 5
* > completed
* ```
*
* @param {number} period The amount of silence required in milliseconds.
* @return {Stream}
*/
export default function throttle(period: number): <T>(ins: Stream<T>) => Stream<T> {
return function throttleOperator<T>(ins: Stream<T>) {
return new Stream<T>(new ThrottleOperator(period, ins));
};
}
40 changes: 40 additions & 0 deletions tests/extra/throttle.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/// <reference path="../../typings/globals/mocha/index.d.ts" />
/// <reference path="../../typings/globals/node/index.d.ts" />
import xs, {Listener, Producer} from '../../src/index';
import throttle from '../../src/extra/throttle';
import * as assert from 'assert';

describe('throttle (extra)', () => {
it('should emit event and drop subsequent events until a period of silence has passed', (done) => {
const producer: Producer<number> = {
start(out: Listener<number>) {
out.next(1);
out.next(2);
setTimeout(() => {
out.next(5);
setTimeout(() => {
out.next(7);
setTimeout(() => {
out.next(9);
}, 150)
}, 20)
}, 200)
},
stop() { }
};
const stream = xs.create(producer).compose(throttle(100));
const expected = [1, 5, 9];
let listener = {
next: (x: number) => {
assert.equal(x, expected.shift());
if (expected.length === 0) {
stream.removeListener(listener);
done();
}
},
error: (err: Error) => done(err),
complete: () => done(new Error('This should not be called')),
};
stream.addListener(listener);
});
});

0 comments on commit 8b5c211

Please sign in to comment.