Skip to content

Commit

Permalink
Append source topic to metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Christoph Petrausch committed Jul 18, 2020
1 parent d55b834 commit 800b8c0
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ type StringValueMappingConfig struct {

func (mc *MetricConfig) PrometheusDescription() *prometheus.Desc {
return prometheus.NewDesc(
mc.PrometheusName, mc.Help, []string{"sensor"}, mc.ConstantLabels,
mc.PrometheusName, mc.Help, []string{"sensor", "topic"}, mc.ConstantLabels,
)
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Metric struct {
Value float64
ValueType prometheus.ValueType
IngestTime time.Time
Topic string
}

type MetricCollection []Metric
Expand Down Expand Up @@ -59,6 +60,7 @@ func (c *MemoryCachedCollector) Collect(mc chan<- prometheus.Metric) {
metric.ValueType,
metric.Value,
device,
metric.Topic,
)
if err != nil {
panic(err)
Expand Down
9 changes: 4 additions & 5 deletions pkg/metrics/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ func (i *Ingest) validMetric(metric string, deviceID string) (config.MetricConfi
return config.MetricConfig{}, false
}

type MQTTPayload map[string]interface{}

func (i *Ingest) store(deviceID string, payload []byte) error {
func (i *Ingest) store(topic string, payload []byte) error {
var mc MetricCollection
deviceID := i.deviceID(topic)
parsed := gojsonq.New().FromString(string(payload))

for path := range i.metricConfigs {
Expand All @@ -66,6 +65,7 @@ func (i *Ingest) store(deviceID string, payload []byte) error {
if err != nil {
return fmt.Errorf("failed to parse valid metric value: %w", err)
}
m.Topic = topic
mc = append(mc, m)
}

Expand Down Expand Up @@ -128,9 +128,8 @@ func (i *Ingest) parseMetric(metricPath string, deviceID string, value interface
func (i *Ingest) SetupSubscriptionHandler(errChan chan<- error) mqtt.MessageHandler {
return func(c mqtt.Client, m mqtt.Message) {
log.Printf("Got message '%s' on topic %s\n", string(m.Payload()), m.Topic())
deviceId := i.deviceID(m.Topic())

err := i.store(deviceId, m.Payload())
err := i.store(m.Topic(), m.Payload())
if err != nil {
errChan <- fmt.Errorf("could not store metrics '%s' on topic %s: %s", string(m.Payload()), m.Topic(), err.Error())
i.MessageMetric.WithLabelValues("storeError", m.Topic()).Inc()
Expand Down

0 comments on commit 800b8c0

Please sign in to comment.