diff --git a/packages/transducers/src/xform/drop-nth.ts b/packages/transducers/src/xform/drop-nth.ts index e32bfcf2b5..7077118311 100644 --- a/packages/transducers/src/xform/drop-nth.ts +++ b/packages/transducers/src/xform/drop-nth.ts @@ -1,12 +1,12 @@ -import { Reducer, Transducer } from "../api"; -import { compR } from "../func/comp"; +import { Transducer } from "../api"; +import { throttle } from "./throttle"; export function dropNth(n: number): Transducer { - n--; - return (rfn: Reducer) => { - const r = rfn[2]; - let skip = n; - return compR(rfn, - (acc, x) => skip-- > 0 ? r(acc, x) : (skip = n, acc)); - }; + n = Math.max(0, n - 1); + return throttle( + () => { + let skip = n; + return () => skip-- > 0 ? true : (skip = n, false); + } + ); } diff --git a/packages/transducers/src/xform/take-nth.ts b/packages/transducers/src/xform/take-nth.ts index 334766baf7..8c10e447c7 100644 --- a/packages/transducers/src/xform/take-nth.ts +++ b/packages/transducers/src/xform/take-nth.ts @@ -1,19 +1,12 @@ -import { Reducer, Transducer } from "../api"; -import { compR } from "../func/comp"; +import { Transducer } from "../api"; +import { throttle } from "./throttle"; export function takeNth(n: number): Transducer { - n--; - return (rfn: Reducer) => { - const r = rfn[2]; - let skip = 0; - return compR(rfn, - (acc, x) => { - if (skip === 0) { - skip = n; - return r(acc, x); - } - skip--; - return acc; - }); - }; + n = Math.max(0, n - 1); + return throttle( + () => { + let skip = 0; + return () => (skip === 0 ? (skip = n, true) : (skip-- , false)); + } + ); } diff --git a/packages/transducers/src/xform/throttle.ts b/packages/transducers/src/xform/throttle.ts index 37064fdec2..84d78ea1f0 100644 --- a/packages/transducers/src/xform/throttle.ts +++ b/packages/transducers/src/xform/throttle.ts @@ -1,18 +1,49 @@ +import { Predicate } from "@thi.ng/api/api"; + import { Reducer, Transducer } from "../api"; import { compR } from "../func/comp"; -export function throttle(delay: number): Transducer { +/** + * Similar to `filter`, but works with possibly stateful predicates + * to achieve rate limiting capabilities. Emits only values when + * predicate returns a truthy value. + * + * To support multiple instances of stateful predicates, the predicate + * itself must be wrapped in a no-arg function, which is called when + * the transducer initializes. Any stateful initialization of the + * predicate MUST be done in this function and the function MUST + * return a 1-arg function, the actual predicate applied to each value. + * + * Also see: `throttleTime()`. + * + * @param pred + */ +export function throttle(pred: () => Predicate): Transducer { return (rfn: Reducer) => { const r = rfn[2]; - let last = 0; + const _pred = pred(); return compR(rfn, - (acc, x) => { - const t = Date.now(); - if (t - last >= delay) { - last = t; - acc = r(acc, x); - } - return acc; - }); + (acc, x) => _pred(x) ? r(acc, x) : acc); }; } + +/** + * Time-based version of `throttle`. Ignores any new values in the + * `delay` interval since the last accepted value. + * + * **Only to be used in async contexts and NOT with `transduce` directly.** + * + * Also see: `@thi.ng/rstream` and `@thi.ng/csp` packages. + * + * @param delay + */ +export function throttleTime(delay: number): Transducer { + return throttle( + () => { + let last = 0; + return () => { + const t = Date.now(); + return t - last >= delay ? (last = t, true) : false; + }; + }); +}