Skip to content

Commit

Permalink
feat(transducers): update throttle(), refactor take/dropNth
Browse files Browse the repository at this point in the history
- throttle() requires stateful predicate now
- add throttleTime() as replacement for prev throttle() impl
- refactor takeNth()/dropNth() to use throttle()

BREAKING CHANGE: throttle() requires stateful predicate now
  • Loading branch information
postspectacular committed Jan 27, 2018
1 parent 0b3c786 commit e1a282c
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 35 deletions.
18 changes: 9 additions & 9 deletions packages/transducers/src/xform/drop-nth.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { Reducer, Transducer } from "../api";
import { compR } from "../func/comp";
import { Transducer } from "../api";
import { throttle } from "./throttle";

export function dropNth<T>(n: number): Transducer<T, T> {
n--;
return (rfn: Reducer<any, T>) => {
const r = rfn[2];
let skip = n;
return compR(rfn,
(acc, x) => skip-- > 0 ? r(acc, x) : (skip = n, acc));
};
n = Math.max(0, n - 1);
return throttle(
() => {
let skip = n;
return () => skip-- > 0 ? true : (skip = n, false);
}
);
}
25 changes: 9 additions & 16 deletions packages/transducers/src/xform/take-nth.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
import { Reducer, Transducer } from "../api";
import { compR } from "../func/comp";
import { Transducer } from "../api";
import { throttle } from "./throttle";

export function takeNth<T>(n: number): Transducer<T, T> {
n--;
return (rfn: Reducer<any, T>) => {
const r = rfn[2];
let skip = 0;
return compR(rfn,
(acc, x) => {
if (skip === 0) {
skip = n;
return r(acc, x);
}
skip--;
return acc;
});
};
n = Math.max(0, n - 1);
return throttle(
() => {
let skip = 0;
return () => (skip === 0 ? (skip = n, true) : (skip-- , false));
}
);
}
51 changes: 41 additions & 10 deletions packages/transducers/src/xform/throttle.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,49 @@
import { Predicate } from "@thi.ng/api/api";

import { Reducer, Transducer } from "../api";
import { compR } from "../func/comp";

export function throttle<T>(delay: number): Transducer<T, T> {
/**
* Similar to `filter`, but works with possibly stateful predicates
* to achieve rate limiting capabilities. Emits only values when
* predicate returns a truthy value.
*
* To support multiple instances of stateful predicates, the predicate
* itself must be wrapped in a no-arg function, which is called when
* the transducer initializes. Any stateful initialization of the
* predicate MUST be done in this function and the function MUST
* return a 1-arg function, the actual predicate applied to each value.
*
* Also see: `throttleTime()`.
*
* @param pred
*/
export function throttle<T>(pred: () => Predicate<T>): Transducer<T, T> {
return (rfn: Reducer<any, T>) => {
const r = rfn[2];
let last = 0;
const _pred = pred();
return compR(rfn,
(acc, x) => {
const t = Date.now();
if (t - last >= delay) {
last = t;
acc = r(acc, x);
}
return acc;
});
(acc, x) => _pred(x) ? r(acc, x) : acc);
};
}

/**
* Time-based version of `throttle`. Ignores any new values in the
* `delay` interval since the last accepted value.
*
* **Only to be used in async contexts and NOT with `transduce` directly.**
*
* Also see: `@thi.ng/rstream` and `@thi.ng/csp` packages.
*
* @param delay
*/
export function throttleTime<T>(delay: number): Transducer<T, T> {
return throttle<T>(
() => {
let last = 0;
return () => {
const t = Date.now();
return t - last >= delay ? (last = t, true) : false;
};
});
}

0 comments on commit e1a282c

Please sign in to comment.