From c2ffd4a14be1cad0b8871180830783a6b8e21858 Mon Sep 17 00:00:00 2001 From: Blake Arnold Date: Fri, 26 May 2023 16:10:05 -0400 Subject: [PATCH] refresh udp ip at some cadence --- statsd/options.go | 13 ++++++++++ statsd/statsd.go | 8 +++--- statsd/telemetry.go | 4 +-- statsd/udp.go | 59 ++++++++++++++++++++++++++++++++++++++++----- 4 files changed, 72 insertions(+), 12 deletions(-) diff --git a/statsd/options.go b/statsd/options.go index 0728a976b..82556dc09 100644 --- a/statsd/options.go +++ b/statsd/options.go @@ -24,6 +24,7 @@ var ( defaultAggregation = true defaultExtendedAggregation = false defaultOriginDetection = true + defaultUDPAddrRefreshRate = 0 * time.Second ) // Options contains the configuration options for a client. @@ -46,6 +47,7 @@ type Options struct { telemetryAddr string originDetection bool containerID string + udpAddrRefreshRate time.Duration } func resolveOptions(options []Option) (*Options, error) { @@ -66,6 +68,7 @@ func resolveOptions(options []Option) (*Options, error) { aggregation: defaultAggregation, extendedAggregation: defaultExtendedAggregation, originDetection: defaultOriginDetection, + udpAddrRefreshRate: defaultUDPAddrRefreshRate, } for _, option := range options { @@ -346,3 +349,13 @@ func WithContainerID(id string) Option { return nil } } + +// WithUDPAddrRefreshRate sets the interval at which the client refreshes the UDP address. +// This is useful when using the Agent's address may change during deployments without a fixed IP. +// A value of 0 disables the refresh. +func WithUDPAddrRefreshRate(rate time.Duration) Option { + return func(o *Options) error { + o.udpAddrRefreshRate = rate + return nil + } +} diff --git a/statsd/statsd.go b/statsd/statsd.go index 378581b9b..98ef8c9f4 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -338,7 +338,7 @@ func parseAgentURL(agentURL string) string { return "" } -func createWriter(addr string, writeTimeout time.Duration) (io.WriteCloser, string, error) { +func createWriter(addr string, writeTimeout time.Duration, udpAddrRefreshRate time.Duration) (io.WriteCloser, string, error) { addr = resolveAddr(addr) if addr == "" { return nil, "", errors.New("No address passed and autodetection from environment failed") @@ -352,7 +352,7 @@ func createWriter(addr string, writeTimeout time.Duration) (io.WriteCloser, stri w, err := newUDSWriter(addr[len(UnixAddressPrefix):], writeTimeout) return w, writerNameUDS, err default: - w, err := newUDPWriter(addr, writeTimeout) + w, err := newUDPWriter(addr, writeTimeout, udpAddrRefreshRate) return w, writerNameUDP, err } } @@ -365,7 +365,7 @@ func New(addr string, options ...Option) (*Client, error) { return nil, err } - w, writerType, err := createWriter(addr, o.writeTimeout) + w, writerType, err := createWriter(addr, o.writeTimeout, o.udpAddrRefreshRate) if err != nil { return nil, err } @@ -494,7 +494,7 @@ func newWithWriter(w io.WriteCloser, o *Options, writerName string) (*Client, er c.telemetryClient = newTelemetryClient(&c, writerName, c.agg != nil) } else { var err error - c.telemetryClient, err = newTelemetryClientWithCustomAddr(&c, writerName, o.telemetryAddr, c.agg != nil, bufferPool, o.writeTimeout) + c.telemetryClient, err = newTelemetryClientWithCustomAddr(&c, writerName, o.telemetryAddr, c.agg != nil, bufferPool, o.writeTimeout, o.udpAddrRefreshRate) if err != nil { return nil, err } diff --git a/statsd/telemetry.go b/statsd/telemetry.go index 1e2bc0a3f..36e4a959e 100644 --- a/statsd/telemetry.go +++ b/statsd/telemetry.go @@ -139,8 +139,8 @@ func newTelemetryClient(c *Client, transport string, aggregationEnabled bool) *t return t } -func newTelemetryClientWithCustomAddr(c *Client, transport string, telemetryAddr string, aggregationEnabled bool, pool *bufferPool, writeTimeout time.Duration) (*telemetryClient, error) { - telemetryWriter, _, err := createWriter(telemetryAddr, writeTimeout) +func newTelemetryClientWithCustomAddr(c *Client, transport string, telemetryAddr string, aggregationEnabled bool, pool *bufferPool, writeTimeout time.Duration, udpAddrRefreshRate time.Duration) (*telemetryClient, error) { + telemetryWriter, _, err := createWriter(telemetryAddr, writeTimeout, udpAddrRefreshRate) if err != nil { return nil, fmt.Errorf("Could not resolve telemetry address: %v", err) } diff --git a/statsd/udp.go b/statsd/udp.go index e2922a911..3344abcd3 100644 --- a/statsd/udp.go +++ b/statsd/udp.go @@ -2,33 +2,80 @@ package statsd import ( "net" + "sync" "time" ) // udpWriter is an internal class wrapping around management of UDP connection type udpWriter struct { - conn net.Conn + conn net.PacketConn + addr string + dst *dstValue + closed chan struct{} +} + +type dstValue struct { + mutex sync.RWMutex + dst *net.UDPAddr +} + +func (d *dstValue) set(dst *net.UDPAddr) { + d.mutex.Lock() + defer d.mutex.Unlock() + d.dst = dst +} + +func (d *dstValue) get() *net.UDPAddr { + d.mutex.RLock() + defer d.mutex.RUnlock() + return d.dst } // New returns a pointer to a new udpWriter given an addr in the format "hostname:port". -func newUDPWriter(addr string, _ time.Duration) (*udpWriter, error) { - udpAddr, err := net.ResolveUDPAddr("udp", addr) +func newUDPWriter(addr string, _ time.Duration, refreshRate time.Duration) (*udpWriter, error) { + conn, err := net.ListenPacket("udp", ":0") if err != nil { return nil, err } - conn, err := net.DialUDP("udp", nil, udpAddr) + currentDst, err := getCurrentDst(addr) if err != nil { return nil, err } - writer := &udpWriter{conn: conn} + dst := &dstValue{dst: currentDst} + writer := &udpWriter{conn: conn, addr: addr, dst: dst, closed: make(chan struct{})} + if refreshRate > 0 { + go writer.refreshDstLoop(refreshRate) + } return writer, nil } +func (w *udpWriter) refreshDstLoop(refreshRate time.Duration) { + ticker := time.NewTicker(refreshRate) + defer ticker.Stop() + for { + select { + case <-w.closed: + return + case <-ticker.C: + dst, err := getCurrentDst(w.addr) + if err != nil { + continue + } + w.dst.set(dst) + } + } +} + // Write data to the UDP connection with no error handling func (w *udpWriter) Write(data []byte) (int, error) { - return w.conn.Write(data) + return w.conn.WriteTo(data, w.dst.get()) } func (w *udpWriter) Close() error { + close(w.closed) return w.conn.Close() } + +func getCurrentDst(addr string) (*net.UDPAddr, error) { + return net.ResolveUDPAddr("udp", addr) +}