Skip to content

Commit

Permalink
Add possibility for GroupValueRead at startup
Browse files Browse the repository at this point in the history
  • Loading branch information
simon mittelberger committed Aug 1, 2023
1 parent 5fb05a3 commit 1e88678
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/knx/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type Config struct {
// MetricsPrefix is a short prefix which will be added in front of the actual metric name.
MetricsPrefix string
AddressConfigs GroupAddressConfigSet
// ReadStartupInterval is the intervall to wait between read of group addresses after startup.
ReadStartupInterval Duration `json:",omitempty"`
}

// ReadConfig reads the given configuration file and returns the parsed Config object.
Expand Down Expand Up @@ -127,6 +129,8 @@ type GroupAddressConfig struct {
MetricType string
// Export the metric to prometheus
Export bool
// ReadStartup allows the exporter to actively send `GroupValueRead` telegrams to actively read the value at startup instead waiting for it.
ReadStartup bool `json:",omitempty"`
// ReadActive allows the exporter to actively send `GroupValueRead` telegrams to actively poll the value instead waiting for it.
ReadActive bool `json:",omitempty"`
// MaxAge of a value until it will actively send a `GroupValueRead` telegram to read the value if ReadActive is set to true.
Expand Down
7 changes: 7 additions & 0 deletions pkg/knx/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type metricsExporter struct {
metrics MetricSnapshotHandler
listener Listener
messageCounter *prometheus.CounterVec
startupReader StartupReader
poller Poller
health error
}
Expand Down Expand Up @@ -64,6 +65,9 @@ func (e *metricsExporter) Run() error {
return err
}

e.startupReader = NewStartupReader(e.config, e.client, e.metrics, e.messageCounter)
e.startupReader.Run()

e.poller = NewPoller(e.config, e.client, e.metrics, e.messageCounter)
e.poller.Run()

Expand All @@ -75,6 +79,9 @@ func (e *metricsExporter) Run() error {
}

func (e *metricsExporter) Close() {
if e.startupReader != nil {
e.startupReader.Close()
}
if e.poller != nil {
e.poller.Close()
}
Expand Down
96 changes: 96 additions & 0 deletions pkg/knx/startup-reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package knx

import (
"reflect"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/vapourismo/knx-go/knx"
"github.com/vapourismo/knx-go/knx/cemi"
)

// StartupReader defines the interface for active polling for metrics values against the knx system at startup.
type StartupReader interface {
// Run starts the startup reading.
Run()
// Close stops the startup reading.
Close()
}

type startupReader struct {
client GroupClient
config *Config
messageCounter *prometheus.CounterVec
snapshotHandler MetricSnapshotHandler
metricsToRead GroupAddressConfigSet
ticker *time.Ticker
}

// NewStartupReader creates a new StartupReader instance using the given MetricsExporter for connection handling and metrics observing.
func NewStartupReader(config *Config, client GroupClient, metricsHandler MetricSnapshotHandler, messageCounter *prometheus.CounterVec) StartupReader {
metricsToRead := getMetricsToRead(config)
return &startupReader{
client: client,
config: config,
messageCounter: messageCounter,
snapshotHandler: metricsHandler,
metricsToRead: metricsToRead,
}
}

func (s *startupReader) Run() {
readInterval := time.Duration(s.config.ReadStartupInterval)
if readInterval.Milliseconds() <= 0 {
readInterval = time.Duration(200 * time.Millisecond)
}
logrus.Infof("start reading addresses after startup in %dms intervals.", readInterval.Milliseconds())
s.ticker = time.NewTicker(readInterval)
c := s.ticker.C
go func(s *startupReader) {
addressesToRead := reflect.ValueOf(s.metricsToRead).MapKeys()
for range c {
if len(addressesToRead) == 0 {
break
}
addressToRead := addressesToRead[0].Interface().(GroupAddress)
s.sendReadMessage(addressToRead)
addressesToRead = addressesToRead[1:]
}
s.ticker.Stop()
}(s)
}

func (s *startupReader) Close() {
if s.ticker != nil {
s.ticker.Stop()
}
}

func (s *startupReader) sendReadMessage(address GroupAddress) {
event := knx.GroupEvent{
Command: knx.GroupRead,
Destination: cemi.GroupAddr(address),
Source: cemi.IndividualAddr(s.config.Connection.PhysicalAddress),
}

if e := s.client.Send(event); e != nil {
logrus.Errorf("can not send read request for %s: %s", address.String(), e)
}
s.messageCounter.WithLabelValues("sent", "true").Inc()
}

func getMetricsToRead(config *Config) GroupAddressConfigSet {
toRead := make(GroupAddressConfigSet)
for address, addressConfig := range config.AddressConfigs {
if !addressConfig.Export || !addressConfig.ReadStartup {
continue
}

toRead[address] = GroupAddressConfig{
Name: config.NameFor(addressConfig),
ReadStartup: true,
}
}
return toRead
}

0 comments on commit 1e88678

Please sign in to comment.