Skip to content

Commit

Permalink
Support arbitrary position of device id in topic
Browse files Browse the repository at this point in the history
The device firmware tasmota places the device ID in the middle of the
topic path. To support those devices to this commit introduce the
device_id_regex configuration. The old behavior of the last element of
the topic path is unchanged.
  • Loading branch information
Christoph Petrausch committed Jul 18, 2020
1 parent ec23d61 commit 5680206
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 13 deletions.
2 changes: 1 addition & 1 deletion cmd/mqtt2prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func main() {
mqttClientOptions.SetPassword(cfg.MQTT.Password)

collector := metrics.NewCollector(cfg.Cache.Timeout, cfg.Metrics)
ingest := metrics.NewIngest(collector, cfg.Metrics)
ingest := metrics.NewIngest(collector, cfg.Metrics, cfg.MQTT.DeviceIDRegex)

errorChan := make(chan error, 1)

Expand Down
5 changes: 5 additions & 0 deletions config.yaml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ mqtt:
# password: happylittleclouds
# The Topic path to subscripe to. Actually this will become `$topic_path/+`
topic_path: v1/devices/me
# Optional: Regular expression to extract the device ID from the topic path. The default regular expression, assumes
# that the last "element" of the topic_path is the device id.
# The regular expression must contain a named capture group with the name deviceid
# For example the expression for tasamota based sensors is "tele/(?P<deviceid>.*)/.*"
# device_id_regex: "(.*/)?(?P<deviceid>.*)"
# The MQTT QoS level
qos: 0
cache:
Expand Down
57 changes: 48 additions & 9 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"fmt"
"io/ioutil"
"regexp"
"time"
Expand All @@ -12,10 +13,13 @@ import (
const GaugeValueType = "gauge"
const CounterValueType = "counter"

const DeviceIDRegexGroup = "deviceid"

var MQTTConfigDefaults = MQTTConfig{
Server: "tcp://127.0.0.1:1883",
TopicPath: "v1/devices/me",
QoS: 0,
Server: "tcp://127.0.0.1:1883",
TopicPath: "v1/devices/me",
DeviceIDRegex: mustNewRegexp(fmt.Sprintf("(.*/)?(?P<%s>.*)", DeviceIDRegexGroup)),
QoS: 0,
}

var CacheConfigDefaults = CacheConfig{
Expand Down Expand Up @@ -49,6 +53,29 @@ func (rf *Regexp) Match(s string) bool {
return rf.r == nil || rf.r.MatchString(s)
}

// GroupValue returns the value of the given group. If the group is not part of the underlying regexp, returns the empty string.
func (rf *Regexp) GroupValue(s string, groupName string) string {
match := rf.r.FindStringSubmatch(s)
groupValues := make(map[string]string)
for i, name := range rf.r.SubexpNames() {
if name != "" {
groupValues[name] = match[i]
}
}
return groupValues[groupName]
}

func (rf *Regexp) RegEx() *regexp.Regexp {
return rf.r
}

func mustNewRegexp(pattern string) *Regexp {
return &Regexp{
pattern: pattern,
r: regexp.MustCompile(pattern),
}
}

type Config struct {
Metrics []MetricConfig `yaml:"metrics"`
MQTT *MQTTConfig `yaml:"mqtt,omitempty"`
Expand All @@ -60,12 +87,12 @@ type CacheConfig struct {
}

type MQTTConfig struct {
Server string `yaml:"server"`
TopicPath string `yaml:"topic_path"`
DeviceIDRegex Regexp `yaml:"deviceIDRegex"`
User string `yaml:"user"`
Password string `yaml:"password"`
QoS byte `yaml:"qos"`
Server string `yaml:"server"`
TopicPath string `yaml:"topic_path"`
DeviceIDRegex *Regexp `yaml:"device_id_regex"`
User string `yaml:"user"`
Password string `yaml:"password"`
QoS byte `yaml:"qos"`
}

// Metrics Config is a mapping between a metric send on mqtt to a prometheus metric
Expand Down Expand Up @@ -118,5 +145,17 @@ func LoadConfig(configFile string) (Config, error) {
if cfg.Cache == nil {
cfg.Cache = &CacheConfigDefaults
}
if cfg.MQTT.DeviceIDRegex == nil {
cfg.MQTT.DeviceIDRegex = MQTTConfigDefaults.DeviceIDRegex
}
var validRegex bool
for _, name := range cfg.MQTT.DeviceIDRegex.RegEx().SubexpNames() {
if name == DeviceIDRegexGroup {
validRegex = true
}
}
if !validRegex {
return Config{}, fmt.Errorf("device id regex %q does not contain required regex group %q", cfg.MQTT.DeviceIDRegex.pattern, DeviceIDRegexGroup)
}
return cfg, nil
}
12 changes: 9 additions & 3 deletions pkg/metrics/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"log"
"path/filepath"
"strconv"
"time"

Expand All @@ -15,18 +14,20 @@ import (

type Ingest struct {
metricConfigs map[string][]config.MetricConfig
deviceIDRegex *config.Regexp
collector Collector
MessageMetric *prometheus.CounterVec
}

func NewIngest(collector Collector, metrics []config.MetricConfig) *Ingest {
func NewIngest(collector Collector, metrics []config.MetricConfig, deviceIDRegex *config.Regexp) *Ingest {
cfgs := make(map[string][]config.MetricConfig)
for i := range metrics {
key := metrics[i].MQTTName
cfgs[key] = append(cfgs[key], metrics[i])
}
return &Ingest{
metricConfigs: cfgs,
deviceIDRegex: deviceIDRegex,
collector: collector,
MessageMetric: prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -112,7 +113,7 @@ func (i *Ingest) store(deviceID string, rawMetrics MQTTPayload) error {
func (i *Ingest) SetupSubscriptionHandler(errChan chan<- error) mqtt.MessageHandler {
return func(c mqtt.Client, m mqtt.Message) {
log.Printf("Got message '%s' on topic %s\n", string(m.Payload()), m.Topic())
deviceId := filepath.Base(m.Topic())
deviceId := i.deviceID(m.Topic())
var rawMetrics MQTTPayload
err := json.Unmarshal(m.Payload(), &rawMetrics)
if err != nil {
Expand All @@ -129,3 +130,8 @@ func (i *Ingest) SetupSubscriptionHandler(errChan chan<- error) mqtt.MessageHand
i.MessageMetric.WithLabelValues("success", m.Topic()).Inc()
}
}

// deviceID uses the configured DeviceIDRegex to extract the device ID from the given mqtt topic path.
func (i *Ingest) deviceID(topic string) string {
return i.deviceIDRegex.GroupValue(topic, config.DeviceIDRegexGroup)
}

0 comments on commit 5680206

Please sign in to comment.