Skip to content

Commit

Permalink
Improves active read of values.
Browse files Browse the repository at this point in the history
Now it is possible to choose how values for metrics should be read
actively. The default method is as already known sending a GroupRead
message to the GroupAddress of the value. New is to send a GroupWrite
message to a different GroupAddress which acts as a trigger for the
device to send the requested values.
  • Loading branch information
chr-fritz committed Aug 15, 2024
1 parent 0f7d425 commit e442629
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 66 deletions.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ KNX Prometheus Exporter will identify itself within it. It has three properties:
- `Endpoint` This defines the ip address or hostname including the port to where the KNX Prometheus
Exporter should open the connection. In case of you are using `Router` in `Type` the default might
be `224.0.23.12:3671`.
- `PhysicalAddress` This defines the physical address of how the KNX Prometheus Exporter will identify
- `PhysicalAddress` This defines the physical address of how the KNX Prometheus Exporter will
identify
itself within your KNX address.
- `RouterConfig` This defines additional
- `TunnelConfig` contains some specific configurations if Type is Tunnel
Expand Down Expand Up @@ -175,6 +176,9 @@ exported to Prometheus. It contains the following structure for every exported g
ReadActive: true
MaxAge: 10m
Comment: dummy comment
ReadType: WriteOther
ReadAddress: 0/0/2
ReadBody: [ 0x1 ]
Labels:
room: office
```
Expand Down Expand Up @@ -206,6 +210,12 @@ Next it defines the actual information for a single group address:
- `ReadActive` can either be `true` or `false`. If set to `true` the KNX Prometheus Exporter will
send a `GroupValueRead` telegram to the group address to active ask for a new value if the last
received value is older than `MaxAge`.
- `ReadType` defines the type how to trigger the read request. Possible Values are `GroupRead` and
`WriteOther`. Default is `GroupRead`.
- `ReadAddress` defines the group address to which address a `GroupWrite` request should be sent to
initiate sending the data if `ReadType` is set to `WriteOther`.
- `ReadBody` is a byte array with the content to sent to `ReadAddress` if `ReadType` is set to
`WriteOther`.
- `MaxAge` defines the maximum age of a value until the KNX Prometheus Exporter will send a
`GroupValueRead` telegram to active request a new value for the group address. This setting will
be ignored if `ReadActive` is set to `false`.
Expand Down
35 changes: 33 additions & 2 deletions pkg/knx/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (c *Config) NameForGa(address GroupAddress) string {
}

// NameFor return s the full metric name for the given GroupAddressConfig.
func (c *Config) NameFor(gaConfig GroupAddressConfig) string {
func (c *Config) NameFor(gaConfig *GroupAddressConfig) string {
return c.MetricsPrefix + gaConfig.Name
}

Expand Down Expand Up @@ -197,6 +197,31 @@ func (d *Duration) UnmarshalJSON(data []byte) error {
return nil
}

type ReadType string

const GroupRead = ReadType("GroupRead")
const WriteOther = ReadType("WriteOther")

func (t ReadType) MarshalJSON() ([]byte, error) {
return json.Marshal(string(t))
}

func (t *ReadType) UnmarshalJSON(data []byte) error {
var str string
if err := json.Unmarshal(data, &str); err != nil {
return err
}
switch strings.ToLower(str) {
case "groupread":
*t = GroupRead
case "writeother":
*t = WriteOther
default:
*t = GroupRead
}
return nil
}

// GroupAddressConfig defines all information to map a KNX group address to a prometheus metric.
type GroupAddressConfig struct {
// Name defines the prometheus metric name without the MetricsPrefix.
Expand All @@ -213,11 +238,17 @@ type GroupAddressConfig struct {
ReadStartup bool `json:",omitempty"`
// ReadActive allows the exporter to actively send `GroupValueRead` telegrams to actively poll the value instead waiting for it.
ReadActive bool `json:",omitempty"`
// ReadType defines the type how to trigger the read request. Possible Values are GroupRead and WriteOther.
ReadType ReadType `json:",omitempty"`
// ReadAddress defines the group address to which address a GroupWrite request should be sent to initiate sending the data if ReadType is set to WriteOther.
ReadAddress GroupAddress `json:",omitempty"`
// ReadBody is a byte array with the content to sent to ReadAddress if ReadType is set to WriteOther.
ReadBody []byte `json:",omitempty"`
// MaxAge of a value until it will actively send a `GroupValueRead` telegram to read the value if ReadActive is set to true.
MaxAge Duration `json:",omitempty"`
// Labels defines static labels that should be set when exporting the metric using prometheus.
Labels map[string]string `json:",omitempty"`
}

// GroupAddressConfigSet is a shortcut type for the group address config map.
type GroupAddressConfigSet map[GroupAddress]GroupAddressConfig
type GroupAddressConfigSet map[GroupAddress]*GroupAddressConfig
18 changes: 11 additions & 7 deletions pkg/knx/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,18 @@ func TestReadConfig(t *testing.T) {
},
},
MetricsPrefix: "knx_",
AddressConfigs: map[GroupAddress]GroupAddressConfig{
AddressConfigs: map[GroupAddress]*GroupAddressConfig{
1: {
Name: "dummy_metric",
DPT: "1.*",
MetricType: "counter",
Export: true,
ReadActive: true,
MaxAge: Duration(10 * time.Minute),
Name: "dummy_metric",
DPT: "1.*",
MetricType: "counter",
Export: true,
ReadActive: true,
MaxAge: Duration(10 * time.Minute),
ReadStartup: true,
ReadType: WriteOther,
ReadAddress: 2,
ReadBody: []byte{1},
},
},
}, false},
Expand Down
6 changes: 3 additions & 3 deletions pkg/knx/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func collectGroupAddresses(groupRange []export.GroupRange) []export.GroupAddress
return addresses
}

func convertAddresses(groupAddresses []export.GroupAddress) map[GroupAddress]GroupAddressConfig {
addressConfigs := make(map[GroupAddress]GroupAddressConfig)
func convertAddresses(groupAddresses []export.GroupAddress) map[GroupAddress]*GroupAddressConfig {
addressConfigs := make(map[GroupAddress]*GroupAddressConfig)
for _, ga := range groupAddresses {
logger := logrus.WithField("address", ga.Address)
address, err := NewGroupAddress(ga.Address)
Expand All @@ -108,7 +108,7 @@ func convertAddresses(groupAddresses []export.GroupAddress) map[GroupAddress]Gro
if err != nil {
logger.Info("Can not normalize data type, ", err)
}
cfg := GroupAddressConfig{
cfg := &GroupAddressConfig{
Name: name,
Comment: ga.Name + "\n" + ga.Description,
DPT: dpt,
Expand Down
4 changes: 4 additions & 0 deletions pkg/knx/fixtures/full-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ AddressConfigs:
MetricType: "counter"
ReadActive: true
MaxAge: 10m
ReadStartup: true
ReadType: WriteOther
ReadAddress: 0/0/2
ReadBody: [ 0x1 ]
4 changes: 2 additions & 2 deletions pkg/knx/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,13 @@ func (l *listener) handleEvent(event knx.GroupEvent) {
value: floatValue,
source: PhysicalAddress(event.Source),
timestamp: time.Now(),
config: &addr,
config: addr,
destination: destination,
}
l.messageCounter.WithLabelValues("received", "true").Inc()
}

func unpackEvent(event knx.GroupEvent, addr GroupAddressConfig) (DPT, error) {
func unpackEvent(event knx.GroupEvent, addr *GroupAddressConfig) (DPT, error) {
v, found := dpt.Produce(addr.DPT)
if !found {
return nil, fmt.Errorf("can not find dpt description for \"%s\" to unpack %s telegram from %s for %s",
Expand Down
2 changes: 1 addition & 1 deletion pkg/knx/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func Test_listener_Run(t *testing.T) {
l := NewListener(
&Config{
MetricsPrefix: "knx_",
AddressConfigs: map[GroupAddress]GroupAddressConfig{
AddressConfigs: map[GroupAddress]*GroupAddressConfig{
GroupAddress(1): {Name: "a", DPT: "1.001", Export: true},
GroupAddress(2): {Name: "b", DPT: "5.001", Export: true},
GroupAddress(3): {Name: "c", DPT: "9.001", Export: true},
Expand Down
30 changes: 20 additions & 10 deletions pkg/knx/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (p *poller) pollAddresses(t time.Time) {
if s == nil {
logrus.WithField("address", address).
Tracef("Initial poll of %s", address.String())
p.sendReadMessage(address)
p.sendReadMessage(address, config)
continue
}

Expand All @@ -96,16 +96,23 @@ func (p *poller) pollAddresses(t time.Time) {
address.String(),
diff.String(),
maxAge.String())
p.sendReadMessage(address)
p.sendReadMessage(address, config)
}
}
}

func (p *poller) sendReadMessage(address GroupAddress) {
func (p *poller) sendReadMessage(address GroupAddress, config *GroupAddressConfig) {
event := knx.GroupEvent{
Command: knx.GroupRead,
Destination: cemi.GroupAddr(address),
Source: cemi.IndividualAddr(p.config.Connection.PhysicalAddress),
Command: knx.GroupRead,
Source: cemi.IndividualAddr(p.config.Connection.PhysicalAddress),
}

if config.ReadType == WriteOther {
event.Command = knx.GroupWrite
event.Destination = cemi.GroupAddr(config.ReadAddress)
event.Data = config.ReadBody
} else {
event.Destination = cemi.GroupAddr(address)
}

if e := p.client.Send(event); e != nil {
Expand All @@ -124,10 +131,13 @@ func getMetricsToPoll(config *Config) GroupAddressConfigSet {
}

interval = time.Duration(math.Max(float64(interval), float64(5*time.Second)))
toPoll[address] = GroupAddressConfig{
Name: config.NameFor(addressConfig),
ReadActive: true,
MaxAge: Duration(interval),
toPoll[address] = &GroupAddressConfig{
Name: config.NameFor(addressConfig),
ReadActive: true,
ReadType: addressConfig.ReadType,
ReadAddress: addressConfig.ReadAddress,
ReadBody: addressConfig.ReadBody,
MaxAge: Duration(interval),
}
}
return toPoll
Expand Down
26 changes: 13 additions & 13 deletions pkg/knx/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,28 @@ func Test_getMetricsToPoll(t *testing.T) {
want GroupAddressConfigSet
}{
{"empty", &Config{AddressConfigs: GroupAddressConfigSet{}}, GroupAddressConfigSet{}},
{"single-no-active-read", &Config{AddressConfigs: GroupAddressConfigSet{0: GroupAddressConfig{ReadActive: false}}}, GroupAddressConfigSet{}},
{"single-too-small-interval", &Config{AddressConfigs: GroupAddressConfigSet{0: GroupAddressConfig{ReadActive: true, MaxAge: Duration(10 * time.Millisecond)}}}, GroupAddressConfigSet{}},
{"single-no-export", &Config{AddressConfigs: GroupAddressConfigSet{0: GroupAddressConfig{ReadActive: true, MaxAge: Duration(10 * time.Second), Export: false}}}, GroupAddressConfigSet{}},
{"single-no-active-read", &Config{AddressConfigs: GroupAddressConfigSet{0: &GroupAddressConfig{ReadActive: false}}}, GroupAddressConfigSet{}},
{"single-too-small-interval", &Config{AddressConfigs: GroupAddressConfigSet{0: &GroupAddressConfig{ReadActive: true, MaxAge: Duration(10 * time.Millisecond)}}}, GroupAddressConfigSet{}},
{"single-no-export", &Config{AddressConfigs: GroupAddressConfigSet{0: &GroupAddressConfig{ReadActive: true, MaxAge: Duration(10 * time.Second), Export: false}}}, GroupAddressConfigSet{}},
{"single-small-interval", &Config{
AddressConfigs: GroupAddressConfigSet{0: GroupAddressConfig{ReadActive: true, MaxAge: Duration(1 * time.Second), Name: "a", Export: true}},
AddressConfigs: GroupAddressConfigSet{0: &GroupAddressConfig{ReadActive: true, MaxAge: Duration(1 * time.Second), Name: "a", Export: true}},
MetricsPrefix: "knx_",
}, GroupAddressConfigSet{0: GroupAddressConfig{Name: "knx_a", ReadActive: true, MaxAge: Duration(5 * time.Second)}}},
}, GroupAddressConfigSet{0: &GroupAddressConfig{Name: "knx_a", ReadActive: true, MaxAge: Duration(5 * time.Second)}}},
{"single", &Config{
AddressConfigs: GroupAddressConfigSet{0: GroupAddressConfig{ReadActive: true, MaxAge: Duration(10 * time.Second), Name: "a", Export: true}},
AddressConfigs: GroupAddressConfigSet{0: &GroupAddressConfig{ReadActive: true, MaxAge: Duration(10 * time.Second), Name: "a", Export: true}},
MetricsPrefix: "knx_",
}, GroupAddressConfigSet{0: GroupAddressConfig{Name: "knx_a", ReadActive: true, MaxAge: Duration(10 * time.Second)}}},
}, GroupAddressConfigSet{0: &GroupAddressConfig{Name: "knx_a", ReadActive: true, MaxAge: Duration(10 * time.Second)}}},
{"multiple", &Config{
AddressConfigs: GroupAddressConfigSet{
0: GroupAddressConfig{ReadActive: true, MaxAge: Duration(10 * time.Second), Name: "a", Export: true},
1: GroupAddressConfig{ReadActive: true, MaxAge: Duration(15 * time.Second), Name: "b", Export: true},
2: GroupAddressConfig{ReadActive: true, MaxAge: Duration(45 * time.Second), Name: "c", Export: true},
0: &GroupAddressConfig{ReadActive: true, MaxAge: Duration(10 * time.Second), Name: "a", Export: true},
1: &GroupAddressConfig{ReadActive: true, MaxAge: Duration(15 * time.Second), Name: "b", Export: true},
2: &GroupAddressConfig{ReadActive: true, MaxAge: Duration(45 * time.Second), Name: "c", Export: true},
},
MetricsPrefix: "knx_",
}, GroupAddressConfigSet{
0: GroupAddressConfig{Name: "knx_a", ReadActive: true, MaxAge: Duration(10 * time.Second)},
1: GroupAddressConfig{Name: "knx_b", ReadActive: true, MaxAge: Duration(15 * time.Second)},
2: GroupAddressConfig{Name: "knx_c", ReadActive: true, MaxAge: Duration(45 * time.Second)},
0: &GroupAddressConfig{Name: "knx_a", ReadActive: true, MaxAge: Duration(10 * time.Second)},
1: &GroupAddressConfig{Name: "knx_b", ReadActive: true, MaxAge: Duration(15 * time.Second)},
2: &GroupAddressConfig{Name: "knx_c", ReadActive: true, MaxAge: Duration(45 * time.Second)},
}},
}
for _, tt := range tests {
Expand Down
33 changes: 18 additions & 15 deletions pkg/knx/startup-reader.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package knx

import (
"reflect"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -46,16 +45,10 @@ func (s *startupReader) Run() {
}
logrus.Infof("start reading addresses after startup in %dms intervals.", readInterval.Milliseconds())
s.ticker = time.NewTicker(readInterval)
c := s.ticker.C
go func() {
addressesToRead := reflect.ValueOf(s.metricsToRead).MapKeys()
for range c {
if len(addressesToRead) == 0 {
break
}
addressToRead := addressesToRead[0].Interface().(GroupAddress)
s.sendReadMessage(addressToRead)
addressesToRead = addressesToRead[1:]
for address, config := range s.metricsToRead {
<-s.ticker.C
s.sendReadMessage(address, config)
}
s.ticker.Stop()
}()
Expand All @@ -67,11 +60,18 @@ func (s *startupReader) Close() {
}
}

func (s *startupReader) sendReadMessage(address GroupAddress) {
func (s *startupReader) sendReadMessage(address GroupAddress, config *GroupAddressConfig) {
event := knx.GroupEvent{
Command: knx.GroupRead,
Destination: cemi.GroupAddr(address),
Source: cemi.IndividualAddr(s.config.Connection.PhysicalAddress),
Command: knx.GroupRead,
Source: cemi.IndividualAddr(s.config.Connection.PhysicalAddress),
}

if config.ReadType == WriteOther {
event.Command = knx.GroupWrite
event.Destination = cemi.GroupAddr(config.ReadAddress)
event.Data = config.ReadBody
} else {
event.Destination = cemi.GroupAddr(address)
}

if e := s.client.Send(event); e != nil {
Expand All @@ -88,9 +88,12 @@ func getMetricsToRead(config *Config) GroupAddressConfigSet {
continue
}

toRead[address] = GroupAddressConfig{
toRead[address] = &GroupAddressConfig{
Name: config.NameFor(addressConfig),
ReadStartup: true,
ReadType: addressConfig.ReadType,
ReadAddress: addressConfig.ReadAddress,
ReadBody: addressConfig.ReadBody,
}
}
return toRead
Expand Down
24 changes: 12 additions & 12 deletions pkg/knx/startup-reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,36 @@ func Test_getMetricsToRead(t *testing.T) {
},
{
"single-no-export-no-startup-read",
&Config{AddressConfigs: GroupAddressConfigSet{0: GroupAddressConfig{ReadStartup: false, Export: false}}},
&Config{AddressConfigs: GroupAddressConfigSet{0: &GroupAddressConfig{ReadStartup: false, Export: false}}},
GroupAddressConfigSet{},
},
{
"single-no-export-startup-read",
&Config{AddressConfigs: GroupAddressConfigSet{0: GroupAddressConfig{Export: false, ReadStartup: true}}},
&Config{AddressConfigs: GroupAddressConfigSet{0: &GroupAddressConfig{Export: false, ReadStartup: true}}},
GroupAddressConfigSet{},
},
{
"single-export-no-startup-read",
&Config{AddressConfigs: GroupAddressConfigSet{0: GroupAddressConfig{Export: true, ReadStartup: false}}},
&Config{AddressConfigs: GroupAddressConfigSet{0: &GroupAddressConfig{Export: true, ReadStartup: false}}},
GroupAddressConfigSet{},
},
{
"single-export-startup-read",
&Config{AddressConfigs: GroupAddressConfigSet{0: GroupAddressConfig{Export: true, ReadStartup: true}}},
GroupAddressConfigSet{0: GroupAddressConfig{ReadStartup: true}},
&Config{AddressConfigs: GroupAddressConfigSet{0: &GroupAddressConfig{Export: true, ReadStartup: true}}},
GroupAddressConfigSet{0: &GroupAddressConfig{ReadStartup: true}},
},
{
"multiple-export-startup-read",
&Config{AddressConfigs: GroupAddressConfigSet{
0: GroupAddressConfig{Export: false, ReadStartup: false},
1: GroupAddressConfig{Export: true, ReadStartup: false},
2: GroupAddressConfig{Export: false, ReadStartup: true},
3: GroupAddressConfig{Export: true, ReadStartup: true},
4: GroupAddressConfig{Export: true, ReadStartup: true},
0: &GroupAddressConfig{Export: false, ReadStartup: false},
1: &GroupAddressConfig{Export: true, ReadStartup: false},
2: &GroupAddressConfig{Export: false, ReadStartup: true},
3: &GroupAddressConfig{Export: true, ReadStartup: true},
4: &GroupAddressConfig{Export: true, ReadStartup: true},
}},
GroupAddressConfigSet{
3: GroupAddressConfig{ReadStartup: true},
4: GroupAddressConfig{ReadStartup: true},
3: &GroupAddressConfig{ReadStartup: true},
4: &GroupAddressConfig{ReadStartup: true},
},
},
}
Expand Down

0 comments on commit e442629

Please sign in to comment.