-
Notifications
You must be signed in to change notification settings - Fork 0
Utilities
Here is a quick overview of utility functions provided by streamlets
package:
Pipes given values to each other. Hopefully will be deprecated if/when the pipeline operator lands.
pipe(a, b, c) === c(b(a))
pipe(
interval(1000),
tap(console.log),
observe
)
// is the same as
observe(tap(interval(1000), console.log))
Connects a source and a sink. Can be used to connect custom sinks in pipelines.
pipe(
src,
// ...
connect(myCustomSink)
)
Returns a promise that resolves with the first value the source emits (from now).
await first(src)
Returns a promise that resolves with the last value the source emits (before ending the stream).
await last(src)
Returns a promise that resolves with the nth value the source emits (from now).
await nth(src, 2)
await pipe(src, nth(2))
Returns an async iterator for
looping over emissions of the source using for await
.
for await (const i of next(src)) {
// ...
}
Creates a custom source using the given greeting function.
const hellowWorldSrc = source(sink => {
sink.greet(talkback({
start() {
sink.receive('Hellow World!')
sink.end()
}
})
})
Creates a custom sink.
const pull@ = () => {
let count = 0
let talkback
return sink({
greet: (talkback = t).start(),
receive: () => (++count === 2) ? talkback.stop() : talkback.request()
})
}
Creates a custom talkback, useful for creating custom sources.
const randomSrc = source(sink => {
sink.greet(talkback({
request: () => sink.receive(Math.random())
})
})
Creates a custom transform. Mainly auto-curries transform functions.
// _mul is not pipeable
const _mul = (src, n) => map(src, x => x * n)
// mul is pipeable
const mul = transform(_mul)
Its like setTimeout()
, but also accepts promises (will invoke the callback when the promise resolves) and sources (will invoke the callback when the source emits).
// says hi in a second
wait(() => console.log('HALO'), 1000)
// says hi when `asyncTask()` is done
wait(() => console.log('HALO'), asyncTask())
// says hi when `source` emits its next value
wait(() => console.log('HALO'), source)