-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathaddrmon.go
138 lines (120 loc) · 3.03 KB
/
addrmon.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Package addrmon contains the address monitor.
package addrmon
import (
"fmt"
"net/netip"
log "github.com/sirupsen/logrus"
"github.com/vishvananda/netlink"
)
// Update is an address update.
type Update struct {
Add bool
Address netip.Prefix
Index int
}
// AddrMon is an address monitor.
type AddrMon struct {
events chan netlink.AddrUpdate
updates chan *Update
upsDone chan struct{}
done chan struct{}
closed chan struct{}
}
// sendUpdate sends an address update.
func (a *AddrMon) sendUpdate(update *Update) {
select {
case a.updates <- update:
case <-a.done:
}
}
// netlinkAddrSubscribeWithOptions is netlink.AddrSubscribeWithOptions for testing.
var netlinkAddrSubscribeWithOptions = netlink.AddrSubscribeWithOptions
// RegisterAddrUpdates registers for addr update events.
var RegisterAddrUpdates = func(a *AddrMon) (chan netlink.AddrUpdate, error) {
// register for addr update events
events := make(chan netlink.AddrUpdate)
options := netlink.AddrSubscribeOptions{
ListExisting: true,
}
if err := netlinkAddrSubscribeWithOptions(events, a.upsDone, options); err != nil {
return nil, fmt.Errorf("could not subscribe to address events: %w", err)
}
return events, nil
}
// start starts the address monitor.
func (a *AddrMon) start() {
defer close(a.closed)
defer close(a.updates)
defer close(a.upsDone)
// handle events
for {
select {
case e, ok := <-a.events:
if !ok {
// unexpected close of events, try to re-open
log.Error("AddrMon got unexpected close of addr events")
events, err := RegisterAddrUpdates(a)
if err != nil {
log.WithError(err).Error("AddrMon register addr updates error")
}
a.events = events
break
}
// forward event as address update
ip, ok := netip.AddrFromSlice(e.LinkAddress.IP)
if !ok || !ip.IsValid() {
log.WithField("LinkAddress", e.LinkAddress).
Error("AddrMon got invalid IP in addr event")
continue
}
ones, _ := e.LinkAddress.Mask.Size()
addr := netip.PrefixFrom(ip, ones)
u := &Update{
Address: addr,
Index: e.LinkIndex,
Add: e.NewAddr,
}
a.sendUpdate(u)
case <-a.done:
// drain events and wait for channel shutdown; this
// could take until the next addr update
go func() {
for range a.events {
// wait for channel shutdown
log.Debug("AddrMon dropping event after stop")
}
}()
// stop address monitor
return
}
}
}
// Start starts the address monitor.
func (a *AddrMon) Start() error {
// register for addr update events
events, err := RegisterAddrUpdates(a)
if err != nil {
return err
}
a.events = events
go a.start()
return nil
}
// Stop stops the address monitor.
func (a *AddrMon) Stop() {
close(a.done)
<-a.closed
}
// Updates returns the address updates channel.
func (a *AddrMon) Updates() chan *Update {
return a.updates
}
// NewAddrMon returns a new address monitor.
func NewAddrMon() *AddrMon {
return &AddrMon{
updates: make(chan *Update),
upsDone: make(chan struct{}),
done: make(chan struct{}),
closed: make(chan struct{}),
}
}