This project implements an end-to-end real-time monitoring solution for IIoT devices, designed to measure parameters like acceleration (or others such as temperature, strain, or humidity) in mechanical equipment or civil structures, with real-time geospatial tracking.
The project is currently deployed in my homelab on a Kubernetes cluster powered by GitOps.
Note
I think that a design like this serves as a solid starting point for a larger project involving real sensor hardware, including data and GPS transmission via Wi-Fi or GSM in both mobile vehicles and static machinery, as well as in civil infrastructure. RabbitMQ is quite flexible and powerfull with its routing strategy, interoperability between messaging protocols, monitoring options, recenlty featured stream queues, to say de least. Similarly, TimescaleDB bridges a significant paradigm gap, making SQL a competitive option for time-series solutions, which were traditionally dominated by NoSQL databases. The project is written in Go. Go is pretty dope.
Back in 2020, I worked on vibration analysis. My main background at that time was in Mechanical Engineering, and I took on a role that involved designing sensor installations, performing in-field measurements, and analyzing the data back at the office.
I measured various types of mechanical equipment, such as overhead cranes in a mining plant and climate control systems (including pumps, cooling towers and air handling units) at Chileβs main airport. Additionally, I assessed civil structures--protected, commercial, and private buildings--subjected to nearby construction or physical phenomena, such as vibrations generated by passing trains.
All of these tasks were performed in-situ, which motivated me to consider a more ambitious approach: remote, real-time monitoring. Such a system could open up new business opportunities by offering continuous insight without requiring on-site personnel.
With that in mind, my goal for this project is to build a comprehensive end-to-end, real-time monitoring solution.
The core of this solution is built around an event-driven architecture that utilizes a pub/sub pattern, enabling the creation of a distributed system. However, as with many systems, a hybrid approach is necessary. This includes employing point-to-point communication for interactions with the sensor cluster via a command-line tool which communicates with an API, allowing controlled interactions with both the database and the message broker.
A key reason for choosing RabbitMQ as the broker for this project is its strong interoperability between protocols. As of 2025, with the introduction of stream queues and the stream plugin, we can now leverage an append-only log for messages along with its own rabbitmq-stream protocol. Having said that, its feasible to utilize multiple protocols in one same RabbitMQ instance, in this project's case: AMQP for inter-service communication, MQTT for sensor data publishing, and the stream protocol for measurements consumption, taking full advantage of the performance benefits offered by the plugin over the core stream implementation.
iotctl
- A command-line tool to interact remotely with a cluster of sensors.
iot-api
- An API that facilitates communication between the service and `iotctl` users over *HTTPS*. It acts as a gateway for interacting with the database and the sensors through the message broker.
sensor-simulation
- Simulates an accelerometer measuring a specific environment. For example, it could mimic the signal of a bearing in a pump system. It consumes commands sent from `iot-api` and publishes its logs (e.g., booting logs), the sensor's serial number for enrollment of the sensor in the database, as well as the measurement values.
sensor-registry
- Consumes sensor enrollment information, with a behavior like: "Look, I'm a sensor with serial number 'xxxx'. If I'm not in the database, please register me so I can start sending measurements."
sensor-logs-ingester
- Consumes sensor logs and saves them into a .log for centralized processing later.
sensor-measurements-ingester
- Consumes sensor measurements and inserts them into the postgres/timescaledb instance.
Important
These services are dependant of other software such as the message broker, a database that would handle timeseries data with ease, a log aggregation system with an observability collector and a visualization tool to real-time monitoring.
π Key Architectural Points
- Data Transfer: The solution is designed to use MQTT for publishing sensor measurements. Currently, JSON is used for serialization, although Protobuf appears particularly suitable for real-world scenarios involving embedded C or C++. As a side note: the initial deployment of the project uses Go's encoding/gob serializer to simplify development.
- Infrastructure: This project integrates with my homelab, which simulates a cloud-like environment on bare metal using TalosOS and GitOps with FluxCD. The only service that's out from the cluster is the command line tool which is intended to be used within a remote machine that needs to authenticate in order to interact with the sensor cluster by means of api keys for auth.
- CI/CD: For CI Iβm using a private Jenkins server and Docker Hub for image storage, while the GitHub repository hosts the source code. The whole CD is handled with FluxCD in a GitOps approach.
- Secrets: Iβm using Azure Key Vault for secrets in the homelab.
- Database: The solution uses PostgreSQL with TimeScaleDB, an extension optimized for time-series data. In a real scenario, the paid cloud tier would be in use, but for this project Iβm storaging everything on bare metal. While a cloud solution is generally preferred, an on-premise setup offers unparalleled flexibility. It's also worth noting that a read replica is in use, which effectively doubles the disk space required.
- Data Management: TimeScaleDBβs policies handle data expiration and compression, preventing storage overflow and improving performance.
- Visualization: Grafana is used for near real-time dashboards, leveraging its querying capabilities to visualize time-series data stored as well as stats from the database itself by means of wrapping the stats from pg_stat_statements and pg_stat_kcache with postgres CTEs and procedures. Last by not least, to query logs (e.g. sensor boot logs) from sources like Loki.
- Alarms: ...
- Communication Protocols:
- Sensor communicates with the Message Broker using MQTT protocol into a stream queue.
- Measurements consumer service consumes from the stream queue by means of the custom RabbitMQ streaming protocol using single active consumer feature.
- Inter-service communication uses AMQP with RabbitMQ, employing quorum queues.
- Alarm service communication uses gRPC for low-latency communication with the machine where the sensor to affect behaviour
Disclaimer: one could conclude that a hybrid architecture for critical low-latency control would also be quite handy. In that case, one would expect using gRPC as the way to communicate between a service that would send direct commands to change behaviour (in a reactive way) not the sensor but to the machine or whatever is behind.
Just threw some paint on the canvas tonight.
-- Kawhi Leonard
π³ Directory Tree
I like the structure that became manifest while developing the project. That's why I'm attaching the filetree since it reads nicely.
.
βββ LICENSE
βββ Makefile
βββ README.md
βββ ROADMAP.md
βββ bin
βΒ Β βββ iot-api
βΒ Β βββ iot-logs-ingester
βΒ Β βββ iot-measurements-ingester
βΒ Β βββ iot-sensor-registry
βΒ Β βββ iot-sensor-simulation
βββ cmd
βΒ Β βββ iot-api
βΒ Β βΒ Β βββ Dockerfile
βΒ Β βΒ Β βββ air.toml
βΒ Β βΒ Β βββ handler_sensors_awake.go
βΒ Β βΒ Β βββ handler_sensors_freq.go
βΒ Β βΒ Β βββ handler_sensors_get.go
βΒ Β βΒ Β βββ handler_sensors_sleep.go
βΒ Β βΒ Β βββ handler_targets_create.go
βΒ Β βΒ Β βββ handler_targets_get.go
βΒ Β βΒ Β βββ json.go
βΒ Β βΒ Β βββ main.go
βΒ Β βΒ Β βββ version.txt
βΒ Β βββ iotctl
βΒ Β βΒ Β βββ cmd
βΒ Β βΒ Β βΒ Β βββ awake.go
βΒ Β βΒ Β βΒ Β βββ changesamplefrequency.go
βΒ Β βΒ Β βΒ Β βββ delete.go
βΒ Β βΒ Β βΒ Β βββ get.go
βΒ Β βΒ Β βΒ Β βββ login.go
βΒ Β βΒ Β βΒ Β βββ logout.go
βΒ Β βΒ Β βΒ Β βββ root.go
βΒ Β βΒ Β βΒ Β βββ sleep.go
βΒ Β βΒ Β βββ main.go
βΒ Β βΒ Β βββ version.txt
βΒ Β βββ sensor-logs-ingester
βΒ Β βΒ Β βββ Dockerfile
βΒ Β βΒ Β βββ air.toml
βΒ Β βΒ Β βββ handlers.go
βΒ Β βΒ Β βββ main.go
βΒ Β βΒ Β βββ version.txt
βΒ Β βββ sensor-measurements-ingester
βΒ Β βΒ Β βββ Dockerfile
βΒ Β βΒ Β βββ air.toml
βΒ Β βΒ Β βββ handlers.go
βΒ Β βΒ Β βββ main.go
βΒ Β βΒ Β βββ version.txt
βΒ Β βββ sensor-registry
βΒ Β βΒ Β βββ Dockerfile
βΒ Β βΒ Β βββ air.toml
βΒ Β βΒ Β βββ handlers.go
βΒ Β βΒ Β βββ main.go
βΒ Β βΒ Β βββ version.txt
βΒ Β βββ sensor-simulation
βΒ Β βββ Dockerfile
βΒ Β βββ air.toml
βΒ Β βββ handlers.go
βΒ Β βββ main.go
βΒ Β βββ version.txt
βββ compose.yaml
βββ dependencies
βΒ Β βββ alloy
βΒ Β βΒ Β βββ alloy-config.alloy
βΒ Β βββ grafana
βΒ Β βΒ Β βββ README.md
βΒ Β βΒ Β βββ grafana.ini
βΒ Β βΒ Β βββ provisioning
βΒ Β βΒ Β βββ dashboards
βΒ Β βΒ Β βΒ Β βββ dashboards.yaml
βΒ Β βΒ Β βΒ Β βββ iot.json
βΒ Β βΒ Β βΒ Β βββ queries.sql
βΒ Β βΒ Β βΒ Β βββ rabbitmq-overview.json
βΒ Β βΒ Β βΒ Β βββ rabbitmq-stream.json
βΒ Β βΒ Β βββ datasources
βΒ Β βΒ Β βΒ Β βββ datasources.yaml
βΒ Β βΒ Β βββ plugins
βΒ Β βΒ Β βββ app.yaml
βΒ Β βββ loki
βΒ Β βΒ Β βββ loki-config.yaml
βΒ Β βββ prometheus
βΒ Β βΒ Β βββ prometheus.yml
βΒ Β βββ rabbitmq
βΒ Β βΒ Β βββ Dockerfile
βΒ Β βΒ Β βββ definitions.json
βΒ Β βΒ Β βββ rabbitmq.conf
βΒ Β βββ timescaledb
βΒ Β βββ Dockerfile
βΒ Β βββ init.sh
βΒ Β βββ postgresql.conf
βββ go.mod
βββ go.sum
βββ internal
βΒ Β βββ auth
βΒ Β βΒ Β βββ auth.go
βΒ Β βββ pubsub
βΒ Β βΒ Β βββ consume.go
βΒ Β βΒ Β βββ consume_test.go
βΒ Β βΒ Β βββ publish.go
βΒ Β βββ routing
βΒ Β βΒ Β βββ models.go
βΒ Β βΒ Β βββ routing.go
βΒ Β βββ sensorlogic
βΒ Β βΒ Β βββ awake.go
βΒ Β βΒ Β βββ changesamplefrequency.go
βΒ Β βΒ Β βββ commands.go
βΒ Β βΒ Β βββ sensor.go
βΒ Β βΒ Β βββ sensorlogs.go
βΒ Β βΒ Β βββ sensormeasurements.go
βΒ Β βΒ Β βββ sensorsignal.go
βΒ Β βΒ Β βββ sleep.go
βΒ Β βββ storage
βΒ Β βΒ Β βββ db.go
βΒ Β βΒ Β βββ logs.go
βΒ Β βΒ Β βββ measurements.go
βΒ Β βΒ Β βββ models.go
βΒ Β βΒ Β βββ sensors.go
βΒ Β βΒ Β βββ targets.go
βΒ Β βββ validation
βΒ Β βββ sensor.go
βΒ Β βββ sensor_test.go
βββ utils
βββ wait-for-services.sh
π π― Database Schema
The beauty of TimescaleDB lies in its foundation on PostgreSQL, allowing us to leverage SQL and embrace core relational database principles, such as normalization, ACID, and all the cool stuff that relational databases are meant for. While it is well known that relational databases are typically unfitted for time-series data, TimescaleDB extends PostgreSQL to overcome this limitation, making projects like this a clear testament to its capability.
In PostgreSQL, the collection of databases within a server instance is referred to as a cluster. The cluster for this project consist of two databases: one named iot
, dedicated to the project itself, and another named monitoring
, used for tracking PostgreSQL cluster statistics. The formar utilizes the autoexplain
, timescaledb
, and postgis
extensions, while the latter employs pg_stat_statements
, pg_stat_kcache
, and timescaledb
to enable real-time monitoring of database statistics.
Postgres manages access permissions using the ROLE
terminology. In this cluster, the following roles are defined:
iot
: A superuser-like roleβused instead of the defaultpostgres
role as a security measure.iot_app
: A role with full CRUD permissions on the iot database.iot_replication
: A role responsible for database replication, if needed.iot_readonly
: A role with read-only access to the iot database.iot_monitoring
: A role with permissions to operate on the monitoring database, including access to the stats extensions data.
As a security best practice, the public schema in the iot database has had its default permissions revoked for the public role.
Timescale hypertables do not support primary keys. This is because the underlying data must be partitioned to several physical PostgreSQL tables. Partitioned look-ups cannot support a primary key, but a composite primary key of together unique columns could be used.
π° Messaging Routing
Some notes worth the time: stream queues are append-only, and thus stored on disk. Also, by their append-only nature streams are like a 'fanout' kind of distribution of their message. They consume the same data from the queue since messages are not being deleted in the rabbitmq environment after consumption and ack (of course, there are retention policies around, so is not strictly like that). This situation infers that having more than one consumer at the same time concurrently consuming the same data is troublesome in terms of consistency and in throughput. Thats why using a single active consumer for streams comes into play as it was released in RabbitMQ 3.1.
Exchange:
- Type:
Topic
- Name:
iot
Queues
Queues follow the entity.id.consumer.type
pattern:
sensor.all.measurements.db_writer
- Streamsensor.<sensor.serial_number>.commands
- Quorumsensor.all.registry.created
- Quorumsensor.all.logs
- Quorum
Routing Keys
Keys are used by publishers with specific values and by consumers with wildcards:
-
Publishers use specific routing keys:
sensor.<sensor.serial_number>.measurements
sensor.<sensor.serial_number>.commands
sensor.<sensor.serial_number>.registry
sensor.<sensor.serial_number>.logs
-
Consumers use wildcard routing keys:
sensor.*.measurements
sensor.*.commands.#
sensor.*.registry.#
sensor.*.logs.#
π» Monitoring
TimeScaleDB integrates seamlessly with Grafana, allowing real-time querying and visualization of sensor data.
π Engineering Calculation Report
General Formula of Accelerometer Signal
(...)