Skip to content

Commit

Permalink
Moved to gojsonq to support nested paths
Browse files Browse the repository at this point in the history
  • Loading branch information
Christoph Petrausch committed Jul 18, 2020
1 parent 5680206 commit 29eb925
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 50 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ require (
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
github.com/prometheus/procfs v0.0.11
github.com/thedevsaddam/gojsonq v2.3.0+incompatible // indirect
github.com/thedevsaddam/gojsonq/v2 v2.5.2
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980
gopkg.in/yaml.v2 v2.2.5
)
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/thedevsaddam/gojsonq v1.9.1 h1:zQulEP43nwmq5EKrNWyIgJVbqDeMdC1qzXM/f5O15a0=
github.com/thedevsaddam/gojsonq v2.3.0+incompatible h1:i2lFTvGY4LvoZ2VUzedsFlRiyaWcJm3Uh6cQ9+HyQA8=
github.com/thedevsaddam/gojsonq v2.3.0+incompatible/go.mod h1:RBcQaITThgJAAYKH7FNp2onYodRz8URfsuEGpAch0NA=
github.com/thedevsaddam/gojsonq/v2 v2.5.2 h1:CoMVaYyKFsVj6TjU6APqAhAvC07hTI6IQen8PHzHYY0=
github.com/thedevsaddam/gojsonq/v2 v2.5.2/go.mod h1:bv6Xa7kWy82uT0LnXPE2SzGqTj33TAEeR560MdJkiXs=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down
108 changes: 58 additions & 50 deletions pkg/metrics/ingest.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package metrics

import (
"encoding/json"
"fmt"
"log"
"strconv"
"time"

gojsonq "github.com/thedevsaddam/gojsonq/v2"

"github.com/eclipse/paho.mqtt.golang"
"github.com/hikhvar/mqtt2prometheus/pkg/config"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -51,77 +52,84 @@ func (i *Ingest) validMetric(metric string, deviceID string) (config.MetricConfi

type MQTTPayload map[string]interface{}

func (i *Ingest) store(deviceID string, rawMetrics MQTTPayload) error {
func (i *Ingest) store(deviceID string, payload []byte) error {
var mc MetricCollection
parsed := gojsonq.New().FromString(string(payload))

for path := range i.metricConfigs {
rawValue := parsed.Find(path)
parsed.Reset()
fmt.Printf("query path: %q data: %v\n", path, rawValue)

for metricName, value := range rawMetrics {
cfg, cfgFound := i.validMetric(metricName, deviceID)
if !cfgFound {
continue
m, err := i.parseMetric(path, deviceID, rawValue)
if err != nil {
return fmt.Errorf("failed to parse valid metric value: %w", err)
}
mc = append(mc, m)
}

var metricValue float64
i.collector.Observe(deviceID, mc)
return nil
}

if boolValue, ok := value.(bool); ok {
if boolValue {
metricValue = 1
} else {
metricValue = 0
}
} else if strValue, ok := value.(string); ok {
func (i *Ingest) parseMetric(metricPath string, deviceID string, value interface{}) (Metric, error) {
cfg, cfgFound := i.validMetric(metricPath, deviceID)
if !cfgFound {
return Metric{}, nil
}

// If string value mapping is defined, use that
if cfg.StringValueMapping != nil {
var metricValue float64

floatValue, ok := cfg.StringValueMapping.Map[strValue]
if ok {
metricValue = floatValue
} else if cfg.StringValueMapping.ErrorValue != nil {
metricValue = *cfg.StringValueMapping.ErrorValue
} else {
return fmt.Errorf("got unexpected string data '%s'", strValue)
}
if boolValue, ok := value.(bool); ok {
if boolValue {
metricValue = 1
} else {
metricValue = 0
}
} else if strValue, ok := value.(string); ok {

} else {
// If string value mapping is defined, use that
if cfg.StringValueMapping != nil {

// otherwise try to parse float
floatValue, err := strconv.ParseFloat(strValue, 64)
if err != nil {
return fmt.Errorf("got data with unexpectd type: %T ('%s') and failed to parse to float", value, value)
}
floatValue, ok := cfg.StringValueMapping.Map[strValue]
if ok {
metricValue = floatValue

} else if cfg.StringValueMapping.ErrorValue != nil {
metricValue = *cfg.StringValueMapping.ErrorValue
} else {
return Metric{}, fmt.Errorf("got unexpected string data '%s'", strValue)
}

} else if floatValue, ok := value.(float64); ok {
metricValue = floatValue
} else {
return fmt.Errorf("got data with unexpectd type: %T ('%s')", value, value)

// otherwise try to parse float
floatValue, err := strconv.ParseFloat(strValue, 64)
if err != nil {
return Metric{}, fmt.Errorf("got data with unexpectd type: %T ('%s') and failed to parse to float", value, value)
}
metricValue = floatValue

}

mc = append(mc, Metric{
Description: cfg.PrometheusDescription(),
Value: metricValue,
ValueType: cfg.PrometheusValueType(),
IngestTime: time.Now(),
})
} else if floatValue, ok := value.(float64); ok {
metricValue = floatValue
} else {
return Metric{}, fmt.Errorf("got data with unexpectd type: %T ('%s')", value, value)
}
i.collector.Observe(deviceID, mc)
return nil
return Metric{
Description: cfg.PrometheusDescription(),
Value: metricValue,
ValueType: cfg.PrometheusValueType(),
IngestTime: time.Now(),
}, nil
}

func (i *Ingest) SetupSubscriptionHandler(errChan chan<- error) mqtt.MessageHandler {
return func(c mqtt.Client, m mqtt.Message) {
log.Printf("Got message '%s' on topic %s\n", string(m.Payload()), m.Topic())
deviceId := i.deviceID(m.Topic())
var rawMetrics MQTTPayload
err := json.Unmarshal(m.Payload(), &rawMetrics)
if err != nil {
errChan <- fmt.Errorf("could not decode message '%s' on topic %s: %s", string(m.Payload()), m.Topic(), err.Error())
i.MessageMetric.WithLabelValues("decodeError", m.Topic()).Desc()
return
}
err = i.store(deviceId, rawMetrics)

err := i.store(deviceId, m.Payload())
if err != nil {
errChan <- fmt.Errorf("could not store metrics '%s' on topic %s: %s", string(m.Payload()), m.Topic(), err.Error())
i.MessageMetric.WithLabelValues("storeError", m.Topic()).Inc()
Expand Down

0 comments on commit 29eb925

Please sign in to comment.