forked from alexcesaro/statsd
-
Notifications
You must be signed in to change notification settings - Fork 4
/
udpconn.go
83 lines (72 loc) · 1.67 KB
/
udpconn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package statsd
import (
"fmt"
"net"
"sync/atomic"
"time"
"unsafe"
)
// UDPConn is an implementation of an io.WriteCLoser, encapsulating a
// net.PacketConn, that supports DNS changes during operation.
// See also the WriteCloser, TrimTrailingNewline, and UDPCheck options.
type UDPConn struct {
conn net.PacketConn
addr unsafe.Pointer
stop chan struct{}
done chan struct{}
}
func NewUDPConn(network, address string, rate time.Duration) (*UDPConn, error) {
if rate <= 0 {
return nil, fmt.Errorf(`invalid rate: %s`, rate)
}
addr, err := resolveUDPAddress(network, address)
if err != nil {
return nil, err
}
conn, err := listenUDP(network, nil)
if err != nil {
return nil, err
}
r := UDPConn{
conn: conn,
addr: unsafe.Pointer(addr),
stop: make(chan struct{}, 1),
done: make(chan struct{}),
}
go r.worker(network, address, rate)
return &r, nil
}
func (x *UDPConn) Write(b []byte) (int, error) {
return x.conn.WriteTo(b, (*net.UDPAddr)(atomic.LoadPointer(&x.addr)))
}
func (x *UDPConn) Close() (err error) {
select {
case x.stop <- struct{}{}:
default:
}
err = x.conn.Close()
<-x.done
return
}
func (x *UDPConn) worker(network, address string, rate time.Duration) {
defer close(x.done)
ticker := time.NewTicker(rate)
defer ticker.Stop()
for {
select {
case <-x.stop:
return
case <-ticker.C:
// TODO cancellation would be nice
newAddr, err := resolveUDPAddress(network, address)
if err != nil {
continue
}
if oldAddr := (*net.UDPAddr)(x.addr); newAddr.Port == oldAddr.Port &&
newAddr.Zone == oldAddr.Zone && newAddr.IP.Equal(oldAddr.IP) {
continue
}
atomic.StorePointer(&x.addr, unsafe.Pointer(newAddr))
}
}
}