Skip to content

Commit

Permalink
refresh udp ip at some cadence
Browse files Browse the repository at this point in the history
  • Loading branch information
blakearnold committed May 26, 2023
1 parent 70cab92 commit c2ffd4a
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 12 deletions.
13 changes: 13 additions & 0 deletions statsd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var (
defaultAggregation = true
defaultExtendedAggregation = false
defaultOriginDetection = true
defaultUDPAddrRefreshRate = 0 * time.Second
)

// Options contains the configuration options for a client.
Expand All @@ -46,6 +47,7 @@ type Options struct {
telemetryAddr string
originDetection bool
containerID string
udpAddrRefreshRate time.Duration
}

func resolveOptions(options []Option) (*Options, error) {
Expand All @@ -66,6 +68,7 @@ func resolveOptions(options []Option) (*Options, error) {
aggregation: defaultAggregation,
extendedAggregation: defaultExtendedAggregation,
originDetection: defaultOriginDetection,
udpAddrRefreshRate: defaultUDPAddrRefreshRate,
}

for _, option := range options {
Expand Down Expand Up @@ -346,3 +349,13 @@ func WithContainerID(id string) Option {
return nil
}
}

// WithUDPAddrRefreshRate sets the interval at which the client refreshes the UDP address.
// This is useful when using the Agent's address may change during deployments without a fixed IP.
// A value of 0 disables the refresh.
func WithUDPAddrRefreshRate(rate time.Duration) Option {
return func(o *Options) error {
o.udpAddrRefreshRate = rate
return nil
}
}
8 changes: 4 additions & 4 deletions statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func parseAgentURL(agentURL string) string {
return ""
}

func createWriter(addr string, writeTimeout time.Duration) (io.WriteCloser, string, error) {
func createWriter(addr string, writeTimeout time.Duration, udpAddrRefreshRate time.Duration) (io.WriteCloser, string, error) {
addr = resolveAddr(addr)
if addr == "" {
return nil, "", errors.New("No address passed and autodetection from environment failed")
Expand All @@ -352,7 +352,7 @@ func createWriter(addr string, writeTimeout time.Duration) (io.WriteCloser, stri
w, err := newUDSWriter(addr[len(UnixAddressPrefix):], writeTimeout)
return w, writerNameUDS, err
default:
w, err := newUDPWriter(addr, writeTimeout)
w, err := newUDPWriter(addr, writeTimeout, udpAddrRefreshRate)
return w, writerNameUDP, err
}
}
Expand All @@ -365,7 +365,7 @@ func New(addr string, options ...Option) (*Client, error) {
return nil, err
}

w, writerType, err := createWriter(addr, o.writeTimeout)
w, writerType, err := createWriter(addr, o.writeTimeout, o.udpAddrRefreshRate)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -494,7 +494,7 @@ func newWithWriter(w io.WriteCloser, o *Options, writerName string) (*Client, er
c.telemetryClient = newTelemetryClient(&c, writerName, c.agg != nil)
} else {
var err error
c.telemetryClient, err = newTelemetryClientWithCustomAddr(&c, writerName, o.telemetryAddr, c.agg != nil, bufferPool, o.writeTimeout)
c.telemetryClient, err = newTelemetryClientWithCustomAddr(&c, writerName, o.telemetryAddr, c.agg != nil, bufferPool, o.writeTimeout, o.udpAddrRefreshRate)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions statsd/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ func newTelemetryClient(c *Client, transport string, aggregationEnabled bool) *t
return t
}

func newTelemetryClientWithCustomAddr(c *Client, transport string, telemetryAddr string, aggregationEnabled bool, pool *bufferPool, writeTimeout time.Duration) (*telemetryClient, error) {
telemetryWriter, _, err := createWriter(telemetryAddr, writeTimeout)
func newTelemetryClientWithCustomAddr(c *Client, transport string, telemetryAddr string, aggregationEnabled bool, pool *bufferPool, writeTimeout time.Duration, udpAddrRefreshRate time.Duration) (*telemetryClient, error) {
telemetryWriter, _, err := createWriter(telemetryAddr, writeTimeout, udpAddrRefreshRate)
if err != nil {
return nil, fmt.Errorf("Could not resolve telemetry address: %v", err)
}
Expand Down
59 changes: 53 additions & 6 deletions statsd/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,80 @@ package statsd

import (
"net"
"sync"
"time"
)

// udpWriter is an internal class wrapping around management of UDP connection
type udpWriter struct {
conn net.Conn
conn net.PacketConn
addr string
dst *dstValue
closed chan struct{}
}

type dstValue struct {
mutex sync.RWMutex
dst *net.UDPAddr
}

func (d *dstValue) set(dst *net.UDPAddr) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.dst = dst
}

func (d *dstValue) get() *net.UDPAddr {
d.mutex.RLock()
defer d.mutex.RUnlock()
return d.dst
}

// New returns a pointer to a new udpWriter given an addr in the format "hostname:port".
func newUDPWriter(addr string, _ time.Duration) (*udpWriter, error) {
udpAddr, err := net.ResolveUDPAddr("udp", addr)
func newUDPWriter(addr string, _ time.Duration, refreshRate time.Duration) (*udpWriter, error) {
conn, err := net.ListenPacket("udp", ":0")
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, udpAddr)
currentDst, err := getCurrentDst(addr)
if err != nil {
return nil, err
}
writer := &udpWriter{conn: conn}
dst := &dstValue{dst: currentDst}
writer := &udpWriter{conn: conn, addr: addr, dst: dst, closed: make(chan struct{})}
if refreshRate > 0 {
go writer.refreshDstLoop(refreshRate)
}
return writer, nil
}

func (w *udpWriter) refreshDstLoop(refreshRate time.Duration) {
ticker := time.NewTicker(refreshRate)
defer ticker.Stop()
for {
select {
case <-w.closed:
return
case <-ticker.C:
dst, err := getCurrentDst(w.addr)
if err != nil {
continue
}
w.dst.set(dst)
}
}
}

// Write data to the UDP connection with no error handling
func (w *udpWriter) Write(data []byte) (int, error) {
return w.conn.Write(data)
return w.conn.WriteTo(data, w.dst.get())
}

func (w *udpWriter) Close() error {
close(w.closed)
return w.conn.Close()
}

func getCurrentDst(addr string) (*net.UDPAddr, error) {
return net.ResolveUDPAddr("udp", addr)
}

0 comments on commit c2ffd4a

Please sign in to comment.