Skip to content

Hybrid IIoT system using PubSub as backbone by means of Go/RabbitMQ, PostgreSQL with TimeScaleDB extension for storage, Alloy and Loki for logs, and Grafana for real-time visualization. Interaction via a CLI tool, which communicates with an HTTP REST API server. Deployed on Kubernetes through GitOps.

License

Notifications You must be signed in to change notification settings

iferdel/sensor-data-streaming-pubsub

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Sensor Data Streaming PubSub

ci test badge

General Description

This project implements an end-to-end real-time monitoring solution for IIoT devices, designed to measure parameters like acceleration (or others such as temperature, strain, or humidity) in mechanical equipment or civil structures, with real-time geospatial tracking.

The project is currently deployed in my homelab on a Kubernetes cluster powered by GitOps.

grafana-dashboard

Note

I think that a design like this serves as a solid starting point for a larger project involving real sensor hardware, including data and GPS transmission via Wi-Fi or GSM in both mobile vehicles and static machinery, as well as in civil infrastructure. RabbitMQ is quite flexible and powerfull with its routing strategy, interoperability between messaging protocols, monitoring options, recenlty featured stream queues, to say de least. Similarly, TimescaleDB bridges a significant paradigm gap, making SQL a competitive option for time-series solutions, which were traditionally dominated by NoSQL databases. The project is written in Go. Go is pretty dope.

Reason

Back in 2020, I worked on vibration analysis. My main background at that time was in Mechanical Engineering, and I took on a role that involved designing sensor installations, performing in-field measurements, and analyzing the data back at the office.

I measured various types of mechanical equipment, such as overhead cranes in a mining plant and climate control systems (including pumps, cooling towers and air handling units) at Chile’s main airport. Additionally, I assessed civil structures--protected, commercial, and private buildings--subjected to nearby construction or physical phenomena, such as vibrations generated by passing trains.

All of these tasks were performed in-situ, which motivated me to consider a more ambitious approach: remote, real-time monitoring. Such a system could open up new business opportunities by offering continuous insight without requiring on-site personnel.

With that in mind, my goal for this project is to build a comprehensive end-to-end, real-time monitoring solution.

πŸ”© Architecture

architecture-diagram

The core of this solution is built around an event-driven architecture that utilizes a pub/sub pattern, enabling the creation of a distributed system. However, as with many systems, a hybrid approach is necessary. This includes employing point-to-point communication for interactions with the sensor cluster via a command-line tool which communicates with an API, allowing controlled interactions with both the database and the message broker.

A key reason for choosing RabbitMQ as the broker for this project is its strong interoperability between protocols. As of 2025, with the introduction of stream queues and the stream plugin, we can now leverage an append-only log for messages along with its own rabbitmq-stream protocol. Having said that, its feasible to utilize multiple protocols in one same RabbitMQ instance, in this project's case: AMQP for inter-service communication, MQTT for sensor data publishing, and the stream protocol for measurements consumption, taking full advantage of the performance benefits offered by the plugin over the core stream implementation.

The services defined in the project are the following:

iotctl
A command-line tool to interact remotely with a cluster of sensors.
iot-api
An API that facilitates communication between the service and `iotctl` users over *HTTPS*. It acts as a gateway for interacting with the database and the sensors through the message broker.
sensor-simulation
Simulates an accelerometer measuring a specific environment. For example, it could mimic the signal of a bearing in a pump system. It consumes commands sent from `iot-api` and publishes its logs (e.g., booting logs), the sensor's serial number for enrollment of the sensor in the database, as well as the measurement values.
sensor-registry
Consumes sensor enrollment information, with a behavior like: "Look, I'm a sensor with serial number 'xxxx'. If I'm not in the database, please register me so I can start sending measurements."
sensor-logs-ingester
Consumes sensor logs and saves them into a .log for centralized processing later.
sensor-measurements-ingester
Consumes sensor measurements and inserts them into the postgres/timescaledb instance.

Important

These services are dependant of other software such as the message broker, a database that would handle timeseries data with ease, a log aggregation system with an observability collector and a visualization tool to real-time monitoring.

πŸ” Key Architectural Points
  • Data Transfer: The solution is designed to use MQTT for publishing sensor measurements. Currently, JSON is used for serialization, although Protobuf appears particularly suitable for real-world scenarios involving embedded C or C++. As a side note: the initial deployment of the project uses Go's encoding/gob serializer to simplify development.
  • Infrastructure: This project integrates with my homelab, which simulates a cloud-like environment on bare metal using TalosOS and GitOps with FluxCD. The only service that's out from the cluster is the command line tool which is intended to be used within a remote machine that needs to authenticate in order to interact with the sensor cluster by means of api keys for auth.
  • CI/CD: For CI I’m using a private Jenkins server and Docker Hub for image storage, while the GitHub repository hosts the source code. The whole CD is handled with FluxCD in a GitOps approach.
  • Secrets: I’m using Azure Key Vault for secrets in the homelab.
  • Database: The solution uses PostgreSQL with TimeScaleDB, an extension optimized for time-series data. In a real scenario, the paid cloud tier would be in use, but for this project I’m storaging everything on bare metal. While a cloud solution is generally preferred, an on-premise setup offers unparalleled flexibility. It's also worth noting that a read replica is in use, which effectively doubles the disk space required.
  • Data Management: TimeScaleDB’s policies handle data expiration and compression, preventing storage overflow and improving performance.
  • Visualization: Grafana is used for near real-time dashboards, leveraging its querying capabilities to visualize time-series data stored as well as stats from the database itself by means of wrapping the stats from pg_stat_statements and pg_stat_kcache with postgres CTEs and procedures. Last by not least, to query logs (e.g. sensor boot logs) from sources like Loki.
  • Alarms: ...
  • Communication Protocols:
    • Sensor communicates with the Message Broker using MQTT protocol into a stream queue.
    • Measurements consumer service consumes from the stream queue by means of the custom RabbitMQ streaming protocol using single active consumer feature.
    • Inter-service communication uses AMQP with RabbitMQ, employing quorum queues.
    • Alarm service communication uses gRPC for low-latency communication with the machine where the sensor to affect behaviour

Disclaimer: one could conclude that a hybrid architecture for critical low-latency control would also be quite handy. In that case, one would expect using gRPC as the way to communicate between a service that would send direct commands to change behaviour (in a reactive way) not the sensor but to the machine or whatever is behind.

🎨 Design

Just threw some paint on the canvas tonight.

-- Kawhi Leonard

🌳 Directory Tree

I like the structure that became manifest while developing the project. That's why I'm attaching the filetree since it reads nicely.

.
β”œβ”€β”€ LICENSE
β”œβ”€β”€ Makefile
β”œβ”€β”€ README.md
β”œβ”€β”€ ROADMAP.md
β”œβ”€β”€ bin
β”‚Β Β  β”œβ”€β”€ iot-api
β”‚Β Β  β”œβ”€β”€ iot-logs-ingester
β”‚Β Β  β”œβ”€β”€ iot-measurements-ingester
β”‚Β Β  β”œβ”€β”€ iot-sensor-registry
β”‚Β Β  └── iot-sensor-simulation
β”œβ”€β”€ cmd
β”‚Β Β  β”œβ”€β”€ iot-api
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ Dockerfile
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ air.toml
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ handler_sensors_awake.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ handler_sensors_freq.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ handler_sensors_get.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ handler_sensors_sleep.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ handler_targets_create.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ handler_targets_get.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ json.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ main.go
β”‚Β Β  β”‚Β Β  └── version.txt
β”‚Β Β  β”œβ”€β”€ iotctl
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ cmd
β”‚Β Β  β”‚Β Β  β”‚Β Β  β”œβ”€β”€ awake.go
β”‚Β Β  β”‚Β Β  β”‚Β Β  β”œβ”€β”€ changesamplefrequency.go
β”‚Β Β  β”‚Β Β  β”‚Β Β  β”œβ”€β”€ delete.go
β”‚Β Β  β”‚Β Β  β”‚Β Β  β”œβ”€β”€ get.go
β”‚Β Β  β”‚Β Β  β”‚Β Β  β”œβ”€β”€ login.go
β”‚Β Β  β”‚Β Β  β”‚Β Β  β”œβ”€β”€ logout.go
β”‚Β Β  β”‚Β Β  β”‚Β Β  β”œβ”€β”€ root.go
β”‚Β Β  β”‚Β Β  β”‚Β Β  └── sleep.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ main.go
β”‚Β Β  β”‚Β Β  └── version.txt
β”‚Β Β  β”œβ”€β”€ sensor-logs-ingester
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ Dockerfile
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ air.toml
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ handlers.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ main.go
β”‚Β Β  β”‚Β Β  └── version.txt
β”‚Β Β  β”œβ”€β”€ sensor-measurements-ingester
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ Dockerfile
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ air.toml
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ handlers.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ main.go
β”‚Β Β  β”‚Β Β  └── version.txt
β”‚Β Β  β”œβ”€β”€ sensor-registry
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ Dockerfile
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ air.toml
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ handlers.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ main.go
β”‚Β Β  β”‚Β Β  └── version.txt
β”‚Β Β  └── sensor-simulation
β”‚Β Β      β”œβ”€β”€ Dockerfile
β”‚Β Β      β”œβ”€β”€ air.toml
β”‚Β Β      β”œβ”€β”€ handlers.go
β”‚Β Β      β”œβ”€β”€ main.go
β”‚Β Β      └── version.txt
β”œβ”€β”€ compose.yaml
β”œβ”€β”€ dependencies
β”‚Β Β  β”œβ”€β”€ alloy
β”‚Β Β  β”‚Β Β  └── alloy-config.alloy
β”‚Β Β  β”œβ”€β”€ grafana
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ README.md
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ grafana.ini
β”‚Β Β  β”‚Β Β  └── provisioning
β”‚Β Β  β”‚Β Β      β”œβ”€β”€ dashboards
β”‚Β Β  β”‚Β Β      β”‚Β Β  β”œβ”€β”€ dashboards.yaml
β”‚Β Β  β”‚Β Β      β”‚Β Β  β”œβ”€β”€ iot.json
β”‚Β Β  β”‚Β Β      β”‚Β Β  β”œβ”€β”€ queries.sql
β”‚Β Β  β”‚Β Β      β”‚Β Β  β”œβ”€β”€ rabbitmq-overview.json
β”‚Β Β  β”‚Β Β      β”‚Β Β  └── rabbitmq-stream.json
β”‚Β Β  β”‚Β Β      β”œβ”€β”€ datasources
β”‚Β Β  β”‚Β Β      β”‚Β Β  └── datasources.yaml
β”‚Β Β  β”‚Β Β      └── plugins
β”‚Β Β  β”‚Β Β          └── app.yaml
β”‚Β Β  β”œβ”€β”€ loki
β”‚Β Β  β”‚Β Β  └── loki-config.yaml
β”‚Β Β  β”œβ”€β”€ prometheus
β”‚Β Β  β”‚Β Β  └── prometheus.yml
β”‚Β Β  β”œβ”€β”€ rabbitmq
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ Dockerfile
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ definitions.json
β”‚Β Β  β”‚Β Β  └── rabbitmq.conf
β”‚Β Β  └── timescaledb
β”‚Β Β      β”œβ”€β”€ Dockerfile
β”‚Β Β      β”œβ”€β”€ init.sh
β”‚Β Β      └── postgresql.conf
β”œβ”€β”€ go.mod
β”œβ”€β”€ go.sum
β”œβ”€β”€ internal
β”‚Β Β  β”œβ”€β”€ auth
β”‚Β Β  β”‚Β Β  └── auth.go
β”‚Β Β  β”œβ”€β”€ pubsub
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ consume.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ consume_test.go
β”‚Β Β  β”‚Β Β  └── publish.go
β”‚Β Β  β”œβ”€β”€ routing
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ models.go
β”‚Β Β  β”‚Β Β  └── routing.go
β”‚Β Β  β”œβ”€β”€ sensorlogic
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ awake.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ changesamplefrequency.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ commands.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ sensor.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ sensorlogs.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ sensormeasurements.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ sensorsignal.go
β”‚Β Β  β”‚Β Β  └── sleep.go
β”‚Β Β  β”œβ”€β”€ storage
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ db.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ logs.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ measurements.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ models.go
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ sensors.go
β”‚Β Β  β”‚Β Β  └── targets.go
β”‚Β Β  └── validation
β”‚Β Β      β”œβ”€β”€ sensor.go
β”‚Β Β      └── sensor_test.go
└── utils
    └── wait-for-services.sh
🐘 🐯 Database Schema

The beauty of TimescaleDB lies in its foundation on PostgreSQL, allowing us to leverage SQL and embrace core relational database principles, such as normalization, ACID, and all the cool stuff that relational databases are meant for. While it is well known that relational databases are typically unfitted for time-series data, TimescaleDB extends PostgreSQL to overcome this limitation, making projects like this a clear testament to its capability.

In PostgreSQL, the collection of databases within a server instance is referred to as a cluster. The cluster for this project consist of two databases: one named iot, dedicated to the project itself, and another named monitoring, used for tracking PostgreSQL cluster statistics. The formar utilizes the autoexplain, timescaledb, and postgis extensions, while the latter employs pg_stat_statements, pg_stat_kcache, and timescaledb to enable real-time monitoring of database statistics.

Postgres manages access permissions using the ROLE terminology. In this cluster, the following roles are defined:

  • iot: A superuser-like roleβ€”used instead of the default postgres role as a security measure.
  • iot_app: A role with full CRUD permissions on the iot database.
  • iot_replication: A role responsible for database replication, if needed.
  • iot_readonly: A role with read-only access to the iot database.
  • iot_monitoring: A role with permissions to operate on the monitoring database, including access to the stats extensions data.

As a security best practice, the public schema in the iot database has had its default permissions revoked for the public role.

iot-db-erd

Timescale hypertables do not support primary keys. This is because the underlying data must be partitioned to several physical PostgreSQL tables. Partitioned look-ups cannot support a primary key, but a composite primary key of together unique columns could be used.

🐰 Messaging Routing

Some notes worth the time: stream queues are append-only, and thus stored on disk. Also, by their append-only nature streams are like a 'fanout' kind of distribution of their message. They consume the same data from the queue since messages are not being deleted in the rabbitmq environment after consumption and ack (of course, there are retention policies around, so is not strictly like that). This situation infers that having more than one consumer at the same time concurrently consuming the same data is troublesome in terms of consistency and in throughput. Thats why using a single active consumer for streams comes into play as it was released in RabbitMQ 3.1.

Exchange:

  • Type: Topic
  • Name: iot

Queues
Queues follow the entity.id.consumer.type pattern:

  • sensor.all.measurements.db_writer - Stream
  • sensor.<sensor.serial_number>.commands - Quorum
  • sensor.all.registry.created - Quorum
  • sensor.all.logs - Quorum

Routing Keys
Keys are used by publishers with specific values and by consumers with wildcards:

  • Publishers use specific routing keys:

    • sensor.<sensor.serial_number>.measurements
    • sensor.<sensor.serial_number>.commands
    • sensor.<sensor.serial_number>.registry
    • sensor.<sensor.serial_number>.logs
  • Consumers use wildcard routing keys:

    • sensor.*.measurements
    • sensor.*.commands.#
    • sensor.*.registry.#
    • sensor.*.logs.#
πŸ’» Monitoring

TimeScaleDB integrates seamlessly with Grafana, allowing real-time querying and visualization of sensor data.

πŸ“ Engineering Calculation Report

General Formula of Accelerometer Signal
$a(t) = A sin(Ο‰t + Ο†)$

πŸ’ Examples

(...)

About

Hybrid IIoT system using PubSub as backbone by means of Go/RabbitMQ, PostgreSQL with TimeScaleDB extension for storage, Alloy and Loki for logs, and Grafana for real-time visualization. Interaction via a CLI tool, which communicates with an HTTP REST API server. Deployed on Kubernetes through GitOps.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published