From 1e8867802a088a9829e2c93240ff74285f2c9838 Mon Sep 17 00:00:00 2001 From: simon mittelberger <> Date: Tue, 1 Aug 2023 22:07:02 +0200 Subject: [PATCH 1/7] Add possibility for GroupValueRead at startup --- pkg/knx/config.go | 4 ++ pkg/knx/exporter.go | 7 +++ pkg/knx/startup-reader.go | 96 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 107 insertions(+) create mode 100644 pkg/knx/startup-reader.go diff --git a/pkg/knx/config.go b/pkg/knx/config.go index e70d3df..45555ec 100644 --- a/pkg/knx/config.go +++ b/pkg/knx/config.go @@ -31,6 +31,8 @@ type Config struct { // MetricsPrefix is a short prefix which will be added in front of the actual metric name. MetricsPrefix string AddressConfigs GroupAddressConfigSet + // ReadStartupInterval is the intervall to wait between read of group addresses after startup. + ReadStartupInterval Duration `json:",omitempty"` } // ReadConfig reads the given configuration file and returns the parsed Config object. @@ -127,6 +129,8 @@ type GroupAddressConfig struct { MetricType string // Export the metric to prometheus Export bool + // ReadStartup allows the exporter to actively send `GroupValueRead` telegrams to actively read the value at startup instead waiting for it. + ReadStartup bool `json:",omitempty"` // ReadActive allows the exporter to actively send `GroupValueRead` telegrams to actively poll the value instead waiting for it. ReadActive bool `json:",omitempty"` // MaxAge of a value until it will actively send a `GroupValueRead` telegram to read the value if ReadActive is set to true. diff --git a/pkg/knx/exporter.go b/pkg/knx/exporter.go index 2b28cd4..17e2112 100644 --- a/pkg/knx/exporter.go +++ b/pkg/knx/exporter.go @@ -37,6 +37,7 @@ type metricsExporter struct { metrics MetricSnapshotHandler listener Listener messageCounter *prometheus.CounterVec + startupReader StartupReader poller Poller health error } @@ -64,6 +65,9 @@ func (e *metricsExporter) Run() error { return err } + e.startupReader = NewStartupReader(e.config, e.client, e.metrics, e.messageCounter) + e.startupReader.Run() + e.poller = NewPoller(e.config, e.client, e.metrics, e.messageCounter) e.poller.Run() @@ -75,6 +79,9 @@ func (e *metricsExporter) Run() error { } func (e *metricsExporter) Close() { + if e.startupReader != nil { + e.startupReader.Close() + } if e.poller != nil { e.poller.Close() } diff --git a/pkg/knx/startup-reader.go b/pkg/knx/startup-reader.go new file mode 100644 index 0000000..2647c34 --- /dev/null +++ b/pkg/knx/startup-reader.go @@ -0,0 +1,96 @@ +package knx + +import ( + "reflect" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "github.com/vapourismo/knx-go/knx" + "github.com/vapourismo/knx-go/knx/cemi" +) + +// StartupReader defines the interface for active polling for metrics values against the knx system at startup. +type StartupReader interface { + // Run starts the startup reading. + Run() + // Close stops the startup reading. + Close() +} + +type startupReader struct { + client GroupClient + config *Config + messageCounter *prometheus.CounterVec + snapshotHandler MetricSnapshotHandler + metricsToRead GroupAddressConfigSet + ticker *time.Ticker +} + +// NewStartupReader creates a new StartupReader instance using the given MetricsExporter for connection handling and metrics observing. +func NewStartupReader(config *Config, client GroupClient, metricsHandler MetricSnapshotHandler, messageCounter *prometheus.CounterVec) StartupReader { + metricsToRead := getMetricsToRead(config) + return &startupReader{ + client: client, + config: config, + messageCounter: messageCounter, + snapshotHandler: metricsHandler, + metricsToRead: metricsToRead, + } +} + +func (s *startupReader) Run() { + readInterval := time.Duration(s.config.ReadStartupInterval) + if readInterval.Milliseconds() <= 0 { + readInterval = time.Duration(200 * time.Millisecond) + } + logrus.Infof("start reading addresses after startup in %dms intervals.", readInterval.Milliseconds()) + s.ticker = time.NewTicker(readInterval) + c := s.ticker.C + go func(s *startupReader) { + addressesToRead := reflect.ValueOf(s.metricsToRead).MapKeys() + for range c { + if len(addressesToRead) == 0 { + break + } + addressToRead := addressesToRead[0].Interface().(GroupAddress) + s.sendReadMessage(addressToRead) + addressesToRead = addressesToRead[1:] + } + s.ticker.Stop() + }(s) +} + +func (s *startupReader) Close() { + if s.ticker != nil { + s.ticker.Stop() + } +} + +func (s *startupReader) sendReadMessage(address GroupAddress) { + event := knx.GroupEvent{ + Command: knx.GroupRead, + Destination: cemi.GroupAddr(address), + Source: cemi.IndividualAddr(s.config.Connection.PhysicalAddress), + } + + if e := s.client.Send(event); e != nil { + logrus.Errorf("can not send read request for %s: %s", address.String(), e) + } + s.messageCounter.WithLabelValues("sent", "true").Inc() +} + +func getMetricsToRead(config *Config) GroupAddressConfigSet { + toRead := make(GroupAddressConfigSet) + for address, addressConfig := range config.AddressConfigs { + if !addressConfig.Export || !addressConfig.ReadStartup { + continue + } + + toRead[address] = GroupAddressConfig{ + Name: config.NameFor(addressConfig), + ReadStartup: true, + } + } + return toRead +} From a97444808f1deb902fb0b7daf218affab057e5f4 Mon Sep 17 00:00:00 2001 From: nomisim <85902312+nomisim@users.noreply.github.com> Date: Tue, 22 Aug 2023 23:18:30 +0200 Subject: [PATCH 2/7] fix unnecessary cast to time.Duration in pkg/knx/startup-reader.go Co-authored-by: Christian Fritz --- pkg/knx/startup-reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/knx/startup-reader.go b/pkg/knx/startup-reader.go index 2647c34..1ab138f 100644 --- a/pkg/knx/startup-reader.go +++ b/pkg/knx/startup-reader.go @@ -42,7 +42,7 @@ func NewStartupReader(config *Config, client GroupClient, metricsHandler MetricS func (s *startupReader) Run() { readInterval := time.Duration(s.config.ReadStartupInterval) if readInterval.Milliseconds() <= 0 { - readInterval = time.Duration(200 * time.Millisecond) + readInterval = 200 * time.Millisecond } logrus.Infof("start reading addresses after startup in %dms intervals.", readInterval.Milliseconds()) s.ticker = time.NewTicker(readInterval) From 3a227555331d17bf1c973ff0a73c87881d920e5d Mon Sep 17 00:00:00 2001 From: simon mittelberger <> Date: Tue, 22 Aug 2023 23:24:38 +0200 Subject: [PATCH 3/7] omit passing of s *startupReader in go routing in pkg/knx/startup-reader.go --- pkg/knx/startup-reader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/knx/startup-reader.go b/pkg/knx/startup-reader.go index 1ab138f..c01711a 100644 --- a/pkg/knx/startup-reader.go +++ b/pkg/knx/startup-reader.go @@ -47,7 +47,7 @@ func (s *startupReader) Run() { logrus.Infof("start reading addresses after startup in %dms intervals.", readInterval.Milliseconds()) s.ticker = time.NewTicker(readInterval) c := s.ticker.C - go func(s *startupReader) { + go func() { addressesToRead := reflect.ValueOf(s.metricsToRead).MapKeys() for range c { if len(addressesToRead) == 0 { @@ -58,7 +58,7 @@ func (s *startupReader) Run() { addressesToRead = addressesToRead[1:] } s.ticker.Stop() - }(s) + }() } func (s *startupReader) Close() { From 7f393ecee5e65a35b7f3e24a3568ec6c374b9ae1 Mon Sep 17 00:00:00 2001 From: simon mittelberger <> Date: Thu, 24 Aug 2023 20:44:08 +0200 Subject: [PATCH 4/7] add description of ReadStartupInterval and ReadStartup to README.md --- README.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/README.md b/README.md index 67e5148..127e23c 100644 --- a/README.md +++ b/README.md @@ -92,12 +92,14 @@ Connection: Endpoint: "192.168.1.15:3671" PhysicalAddress: 2.0.1 MetricsPrefix: knx_ +ReadStartupInterval: 200ms AddressConfigs: 0/0/1: Name: dummy_metric DPT: 1.* Export: true MetricType: "counter" + ReadStartup: true ReadActive: true MaxAge: 10m Comment: dummy comment @@ -125,6 +127,12 @@ The `MetricsPrefix` defines a single string that will be added to all your expor format must be compliant with the [prometheus metrics names](https://prometheus.io/docs/practices/naming/#metric-names). +#### The `ReadStartupInterval` +It is possible to send `GroupValueRead` telegrams to specific group addresses at startup. +Sending out all `GroupValueRead` telegrams at once on startup can overwhelm the KNX bus. +The `ReadStartupInterval` defines the interval between the `GroupValueRead` telegrams sent out at startup. +If not specified, `ReadStartupInterval` is set to 200ms by default. + #### The `AddressConfigs` section The `AddressConfigs` section defines all the information about the group addresses which should be @@ -136,6 +144,7 @@ exported to prometheus. It contains the following structure for every exported g DPT: 1.* Export: true MetricType: "counter" + ReadStartup: true ReadActive: true MaxAge: 10m Comment: dummy comment @@ -164,6 +173,9 @@ Next it defines the actual information for a single group address: - `MetricType` defines the type of the exported metric. Can be either `counter` or `gauge`. See [Prometheus documentation counter vs. gauge](https://prometheus.io/docs/practices/instrumentation/#counter-vs-gauge-summary-vs-histogram) for more information about it. +- `ReadStartup` can either be `true` or `false`. If set to `true` the KNX Prometheus Exporter will + send a `GroupValueRead` telegram to the group address to active ask for a new value once after startup. + In contrast to `ReadActive` this sends out a `GroupValueRead` telegram at startuo once. - `ReadActive` can either be `true` or `false`. If set to `true` the KNX Prometheus Exporter will send a `GroupValueRead` telegram to the group address to active ask for a new value if the last received value is older than `MaxAge`. From 9f1273379cb1664136dd0f9d9fdeb017c97ba4c3 Mon Sep 17 00:00:00 2001 From: simon mittelberger <> Date: Thu, 24 Aug 2023 20:47:15 +0200 Subject: [PATCH 5/7] fix typo in README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 127e23c..328d711 100644 --- a/README.md +++ b/README.md @@ -174,8 +174,8 @@ Next it defines the actual information for a single group address: [Prometheus documentation counter vs. gauge](https://prometheus.io/docs/practices/instrumentation/#counter-vs-gauge-summary-vs-histogram) for more information about it. - `ReadStartup` can either be `true` or `false`. If set to `true` the KNX Prometheus Exporter will - send a `GroupValueRead` telegram to the group address to active ask for a new value once after startup. - In contrast to `ReadActive` this sends out a `GroupValueRead` telegram at startuo once. + send a `GroupValueRead` telegram to the group address to actively ask for a new value once after startup. + In contrast to `ReadActive` this sends out a `GroupValueRead` telegram at startup once. - `ReadActive` can either be `true` or `false`. If set to `true` the KNX Prometheus Exporter will send a `GroupValueRead` telegram to the group address to active ask for a new value if the last received value is older than `MaxAge`. From 0508f6eca48026636903a7fff52c0c67c2f43fbb Mon Sep 17 00:00:00 2001 From: simon mittelberger <> Date: Thu, 24 Aug 2023 21:14:30 +0200 Subject: [PATCH 6/7] add tests for getMetricsToRead of startup-reader --- pkg/knx/startup-reader_test.go | 62 ++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 pkg/knx/startup-reader_test.go diff --git a/pkg/knx/startup-reader_test.go b/pkg/knx/startup-reader_test.go new file mode 100644 index 0000000..7fe0b05 --- /dev/null +++ b/pkg/knx/startup-reader_test.go @@ -0,0 +1,62 @@ +package knx + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_getMetricsToRead(t *testing.T) { + + tests := []struct { + name string + config *Config + want GroupAddressConfigSet + }{ + { + "empty", + &Config{AddressConfigs: GroupAddressConfigSet{}}, + GroupAddressConfigSet{}, + }, + { + "single-no-export-no-startup-read", + &Config{AddressConfigs: GroupAddressConfigSet{0: GroupAddressConfig{ReadStartup: false, Export: false}}}, + GroupAddressConfigSet{}, + }, + { + "single-no-export-startup-read", + &Config{AddressConfigs: GroupAddressConfigSet{0: GroupAddressConfig{Export: false, ReadStartup: true}}}, + GroupAddressConfigSet{}, + }, + { + "single-export-no-startup-read", + &Config{AddressConfigs: GroupAddressConfigSet{0: GroupAddressConfig{Export: true, ReadStartup: false}}}, + GroupAddressConfigSet{}, + }, + { + "single-export-startup-read", + &Config{AddressConfigs: GroupAddressConfigSet{0: GroupAddressConfig{Export: true, ReadStartup: true}}}, + GroupAddressConfigSet{0: GroupAddressConfig{ReadStartup: true}}, + }, + { + "multiple-export-startup-read", + &Config{AddressConfigs: GroupAddressConfigSet{ + 0: GroupAddressConfig{Export: false, ReadStartup: false}, + 1: GroupAddressConfig{Export: true, ReadStartup: false}, + 2: GroupAddressConfig{Export: false, ReadStartup: true}, + 3: GroupAddressConfig{Export: true, ReadStartup: true}, + 4: GroupAddressConfig{Export: true, ReadStartup: true}, + }}, + GroupAddressConfigSet{ + 3: GroupAddressConfig{ReadStartup: true}, + 4: GroupAddressConfig{ReadStartup: true}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := getMetricsToRead(tt.config) + assert.Equal(t, tt.want, got) + }) + } +} From c2c9ac138552e632528730ae19ad38dc5f69777d Mon Sep 17 00:00:00 2001 From: simon mittelberger <> Date: Thu, 24 Aug 2023 21:26:36 +0200 Subject: [PATCH 7/7] add test for complete StartupReader --- pkg/knx/fixtures/readConfig.yaml | 5 +++++ pkg/knx/startup-reader_test.go | 37 ++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/pkg/knx/fixtures/readConfig.yaml b/pkg/knx/fixtures/readConfig.yaml index 93971ea..854a6d6 100644 --- a/pkg/knx/fixtures/readConfig.yaml +++ b/pkg/knx/fixtures/readConfig.yaml @@ -3,12 +3,14 @@ Connection: Endpoint: "224.0.0.120:3672" PhysicalAddress: 2.0.1 MetricsPrefix: knx_ +ReadStartupInterval: 250ms AddressConfigs: 0/0/1: Name: dummy_metric DPT: 1.* Export: true MetricType: "counter" + ReadStartup: true ReadActive: true MaxAge: 5s 0/0/2: @@ -16,6 +18,7 @@ AddressConfigs: DPT: 1.* Export: true MetricType: "counter" + ReadStartup: true ReadActive: true MaxAge: 5s 0/0/3: @@ -23,6 +26,7 @@ AddressConfigs: DPT: 1.* Export: true MetricType: "counter" + ReadStartup: true ReadActive: true MaxAge: 5s 0/0/4: @@ -30,5 +34,6 @@ AddressConfigs: DPT: 1.* Export: true MetricType: "counter" + ReadStartup: false ReadActive: false MaxAge: 5s \ No newline at end of file diff --git a/pkg/knx/startup-reader_test.go b/pkg/knx/startup-reader_test.go index 7fe0b05..2ba6549 100644 --- a/pkg/knx/startup-reader_test.go +++ b/pkg/knx/startup-reader_test.go @@ -2,8 +2,13 @@ package knx import ( "testing" + "time" + gomock "github.com/golang/mock/gomock" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" + "github.com/vapourismo/knx-go/knx" + "github.com/vapourismo/knx-go/knx/cemi" ) func Test_getMetricsToRead(t *testing.T) { @@ -60,3 +65,35 @@ func Test_getMetricsToRead(t *testing.T) { }) } } + +func TestStartupReader(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + groupClient := NewMockGroupClient(ctrl) + mockSnapshotHandler := NewMockMetricSnapshotHandler(ctrl) + messageCounter := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"direction", "processed"}) + + config, err := ReadConfig("fixtures/readConfig.yaml") + + assert.NoError(t, err) + + groupClient.EXPECT().Send(knx.GroupEvent{ + Command: knx.GroupRead, Source: cemi.NewIndividualAddr3(2, 0, 1), Destination: cemi.NewGroupAddr3(0, 0, 1), + }).Times(1) + groupClient.EXPECT().Send(knx.GroupEvent{ + Command: knx.GroupRead, Source: cemi.NewIndividualAddr3(2, 0, 1), Destination: cemi.NewGroupAddr3(0, 0, 2), + }).Times(1) + groupClient.EXPECT().Send(knx.GroupEvent{ + Command: knx.GroupRead, Source: cemi.NewIndividualAddr3(2, 0, 1), Destination: cemi.NewGroupAddr3(0, 0, 3), + }).Times(1) + groupClient.EXPECT().Send(knx.GroupEvent{ + Command: knx.GroupRead, Source: cemi.NewIndividualAddr3(2, 0, 1), Destination: cemi.NewGroupAddr3(0, 0, 4), + }).Times(0) + + s := NewStartupReader(config, groupClient, mockSnapshotHandler, messageCounter) + s.Run() + time.Sleep(2000 * time.Millisecond) + + s.Close() +}