-
Notifications
You must be signed in to change notification settings - Fork 0
Getting Started
The most simple data stream is a source that is being observed (or iterated on):
import { interval, observe } from 'streamlets'
// this source emits every second
const src = interval(1000)
// lets observe the source
observe(src)
This code does nothing in particular, since we just observe the source without doing anything with it. To receive the data from a source, we can tap into the stream:
import { interval, tap, observe } from 'streamlets'
const src = interval(1000)
// this still does nothing, as this tapped source
// is not observed yet.
const tapped = tap(src, x => console.log(x))
// now we'll have a log every second.
observe(tapped)
// > 0
// > 1
// > 2
// ...
observe()
returns an Observation
, which can be stopped and restarted at will:
const src = interval(1000)
const tapped = tap(src, x => console.log(x))
const obs = observe(tapped)
setTimeout(() => obs.stop(), 3000)
setTimeout(() => obs.start(), 5000)
// > 0
// > 1
// --> here we get a pause for 2 seconds
// > 2
// > 3
// ...
👉 DON'T forget to stop observations when you are done!
Streams typically hold onto some resources while they are being observed. For example,
interval()
's will keep an interval running while they are being observed. If you do not call.stop()
on the corresponding observation, the timer will keep running.
In the example above, interval()
returns a timer that will automatically push values every second. This is a listenable source, which means it will push data without being asked to.
Some sources might be pullable instead, i.e. they might wait for someone pulling data before sending it. For example, iterable()
creates pullable sources from iterables (arrays, iterators, generators):
import { iterable, tap, observe } from 'streamlets'
const src = iterable([1, 2, 3, 4, 5])
const tapped = tap(src, x => console.log(x))
// this does nothing. the source needs to be pulled.
observe(src)
observe()
will not pull on the source by its own, but you can request data via the returned Observation
:
const src = iterable([1, 2, 3, 4, 5])
const tapped = tap(src, x => console.log(x))
const obs = observe(src)
obs.request()
// > 1
obs.request()
// > 2
Alternatively, you can use iterate()
, which will pull the source on initialization and after receiving each data point:
import { iterable, tap, iterate } from 'streamlet'
const src = iterable([1, 2, 3, 4, 5])
const tapped = tap(src, x => console.log(x))
iterate(tapped)
// > 1, 2, 3, 4, 5
💡
iterate()
returns anIteration
which can be stopped and restarted similar to anObservation
.
In most cases, you'd need to transform the data coming from the source in some way: you might want to filter some irrelevant data points, or might want to change every incoming value according to some mapping.
import { interval, observe, tap, map, filter } from 'streamlet'
const src = interval(1000)
// only allow even values
const filtered = filter(src, x => x % 2 === 0)
// multiply every incoming value by 10
const mapped = map(filtered, x => x * 10)
const tapped = tap(mapped, console.log)
observe(tapped)
// > 0
// > 20
// > 40
// > 60
// ...
To make these transformation chains shorter and more readable, you can use the pipe()
utility:
import { interval, observe, tap, map, filter, pipe } from 'streamlet'
pipe(
interval(1000),
filter(x => x % 2 === 0),
map(x => x * 10),
tap(console.log),
observe
)
// > 0
// > 20
// > 40
// > 60
// ...