Use this class to produce artificial data and publish it via mqtt to a specific host.
# import class
from forger.engine import Manager
# init manager instance
man = Manager()
# create a new pipeline that will send data onto the mqtt topic 'foo' with 15 Hz.
pipeline = man.add_pipeline(ip='test.mosquitto.org', port=1883, topic='foo', frequency=15)
# attach a function/channel to the just created pipeline that will produce a
# sin-wave with an lower bound of -1 and upper bound of 3.
# The sine wave will have an 0.5 Hz frequency.
channel_1 = pipeline.add_channel(name='bar', scale=[-1, 3], frequency=0.5)
foo b'{"timestamp": "2019-08-28T09:38:55.814337", "bar": 0.5487491837412708}'
foo b'{"timestamp": "2019-08-28T09:38:56.821802", "bar": -0.052118018113447295}'
foo b'{"timestamp": "2019-08-28T09:38:57.805056", "bar": -0.620937663401906}'
foo b'{"timestamp": "2019-08-28T09:38:58.811185", "bar": -0.9641198905685163}'
foo b'{"timestamp": "2019-08-28T09:38:59.817089", "bar": -0.9347151370201041}'
foo b'{"timestamp": "2019-08-28T09:39:00.816615", "bar": -0.5475520657645743}'
# visualize all channels on this pipeline
pipeline.draw()
# add noise to the already existing channel 'bar'
noisy_channel = pipeline.add_channel(name='bar', channel_type='random')
# add another sine-wave as a new channel 'baz'.
# this channel has a dead time of 1 second. occuring every 2 seconds.
another_channel_id = pipeline.add_channel(name='baz', dead_frequency=0.5, dead_period=1)
# import numpy to generate some data
import numpy as np
# it is also possible to replay your own list of datapoints.
data = [np.tanh(x) for x in np.linspace(-2, 2, 100)]
# add another new channel 'qux' that replays the previously generated dataset.
# it is a good habit to define a frequency so you know the speed at which your data will be streamed.
# also note that scaling is turned off for channels with replay data (but not for interfering channels!).
replay_channel = pipeline.add_channel(name='qux', replay_data=data, frequency=0.1)
# either by still having the object itself (e.g. replay_channel from example above)
pipeline.remove_channel(channel=replay_channel)
# or by getting all channels that stream onto a certain name
forgotten_channels = pipeline.get_channels(name="bar") # this returns [channel_1, noisy_channel]
# or just by removing all channels of a pipeline (topic)
pipeline.remove_all_channels()
Check out the TODO.md.