From 08875a36873b97de6bac7d91e5b4c75dce8f7c7e Mon Sep 17 00:00:00 2001 From: Trevor Foster Date: Tue, 1 Sep 2020 12:08:11 -0400 Subject: [PATCH] Add reconnecting udp connection type to jaeger exporter (#1063) * port reconnecting udp client from jaeger-client-go * Fix precommit issues * Fix license check * Add initial value for max packet size * Fix for atomic usage on 386 arch * Modify reconnecting option to an affirmative * Add changelog entry * Dont hold rlock for writes Co-authored-by: Tyler Yahn --- CHANGELOG.md | 6 + example/jaeger/go.sum | 1 + exporters/trace/jaeger/agent.go | 71 ++- exporters/trace/jaeger/agent_test.go | 94 ++++ exporters/trace/jaeger/go.sum | 1 + .../trace/jaeger/reconnecting_udp_client.go | 202 ++++++++ .../jaeger/reconnecting_udp_client_test.go | 462 ++++++++++++++++++ exporters/trace/jaeger/uploader.go | 44 +- 8 files changed, 862 insertions(+), 19 deletions(-) create mode 100644 exporters/trace/jaeger/agent_test.go create mode 100644 exporters/trace/jaeger/reconnecting_udp_client.go create mode 100644 exporters/trace/jaeger/reconnecting_udp_client_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index e553b56ae9c6..22a6a61bd943 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,12 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Added + +- Add reconnecting udp connection type to Jaeger exporter. + This change adds a new optional implementation of the udp conn interface used to detect changes to an agent's host dns record. + It then adopts the new destination address to ensure the exporter doesn't get stuck. This change was ported from jaegertracing/jaeger-client-go#520. (#1063) + ## [0.11.0] - 2020-08-24 ### Added diff --git a/example/jaeger/go.sum b/example/jaeger/go.sum index 7cfad7a33c30..6e3f55393a7d 100644 --- a/example/jaeger/go.sum +++ b/example/jaeger/go.sum @@ -115,6 +115,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= diff --git a/exporters/trace/jaeger/agent.go b/exporters/trace/jaeger/agent.go index 7010e9626299..9eaebc25d73a 100644 --- a/exporters/trace/jaeger/agent.go +++ b/exporters/trace/jaeger/agent.go @@ -17,7 +17,9 @@ package jaeger import ( "fmt" "io" + "log" "net" + "time" "github.com/apache/thrift/lib/go/thrift" @@ -32,41 +34,76 @@ type agentClientUDP struct { gen.Agent io.Closer - connUDP *net.UDPConn + connUDP udpConn client *gen.AgentClient maxPacketSize int // max size of datagram in bytes thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span } +type udpConn interface { + Write([]byte) (int, error) + SetWriteBuffer(int) error + Close() error +} + +type agentClientUDPParams struct { + HostPort string + MaxPacketSize int + Logger *log.Logger + AttemptReconnecting bool + AttemptReconnectInterval time.Duration +} + // newAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP. -func newAgentClientUDP(hostPort string, maxPacketSize int) (*agentClientUDP, error) { - if maxPacketSize == 0 { - maxPacketSize = udpPacketMaxLength +func newAgentClientUDP(params agentClientUDPParams) (*agentClientUDP, error) { + // validate hostport + if _, _, err := net.SplitHostPort(params.HostPort); err != nil { + return nil, err + } + + if params.MaxPacketSize <= 0 { + params.MaxPacketSize = udpPacketMaxLength } - thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize) + if params.AttemptReconnecting && params.AttemptReconnectInterval <= 0 { + params.AttemptReconnectInterval = time.Second * 30 + } + + thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize) protocolFactory := thrift.NewTCompactProtocolFactory() client := gen.NewAgentClientFactory(thriftBuffer, protocolFactory) - destAddr, err := net.ResolveUDPAddr("udp", hostPort) - if err != nil { - return nil, err - } + var connUDP udpConn + var err error - connUDP, err := net.DialUDP(destAddr.Network(), nil, destAddr) - if err != nil { - return nil, err + if params.AttemptReconnecting { + // host is hostname, setup resolver loop in case host record changes during operation + connUDP, err = newReconnectingUDPConn(params.HostPort, params.MaxPacketSize, params.AttemptReconnectInterval, net.ResolveUDPAddr, net.DialUDP, params.Logger) + if err != nil { + return nil, err + } + } else { + destAddr, err := net.ResolveUDPAddr("udp", params.HostPort) + if err != nil { + return nil, err + } + + connUDP, err = net.DialUDP(destAddr.Network(), nil, destAddr) + if err != nil { + return nil, err + } } - if err := connUDP.SetWriteBuffer(maxPacketSize); err != nil { + + if err := connUDP.SetWriteBuffer(params.MaxPacketSize); err != nil { return nil, err } - clientUDP := &agentClientUDP{ + return &agentClientUDP{ connUDP: connUDP, client: client, - maxPacketSize: maxPacketSize, - thriftBuffer: thriftBuffer} - return clientUDP, nil + maxPacketSize: params.MaxPacketSize, + thriftBuffer: thriftBuffer, + }, nil } // EmitBatch implements EmitBatch() of Agent interface diff --git a/exporters/trace/jaeger/agent_test.go b/exporters/trace/jaeger/agent_test.go new file mode 100644 index 000000000000..f449f07e9f75 --- /dev/null +++ b/exporters/trace/jaeger/agent_test.go @@ -0,0 +1,94 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package jaeger + +import ( + "log" + "net" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewAgentClientUDPWithParamsBadHostport(t *testing.T) { + hostPort := "blahblah" + + agentClient, err := newAgentClientUDP(agentClientUDPParams{ + HostPort: hostPort, + }) + + assert.Error(t, err) + assert.Nil(t, agentClient) +} + +func TestNewAgentClientUDPWithParams(t *testing.T) { + mockServer, err := newUDPListener() + require.NoError(t, err) + defer mockServer.Close() + + agentClient, err := newAgentClientUDP(agentClientUDPParams{ + HostPort: mockServer.LocalAddr().String(), + MaxPacketSize: 25000, + AttemptReconnecting: true, + }) + assert.NoError(t, err) + assert.NotNil(t, agentClient) + assert.Equal(t, 25000, agentClient.maxPacketSize) + + if assert.IsType(t, &reconnectingUDPConn{}, agentClient.connUDP) { + assert.Equal(t, (*log.Logger)(nil), agentClient.connUDP.(*reconnectingUDPConn).logger) + } + + assert.NoError(t, agentClient.Close()) +} + +func TestNewAgentClientUDPWithParamsDefaults(t *testing.T) { + mockServer, err := newUDPListener() + require.NoError(t, err) + defer mockServer.Close() + + agentClient, err := newAgentClientUDP(agentClientUDPParams{ + HostPort: mockServer.LocalAddr().String(), + AttemptReconnecting: true, + }) + assert.NoError(t, err) + assert.NotNil(t, agentClient) + assert.Equal(t, udpPacketMaxLength, agentClient.maxPacketSize) + + if assert.IsType(t, &reconnectingUDPConn{}, agentClient.connUDP) { + assert.Equal(t, (*log.Logger)(nil), agentClient.connUDP.(*reconnectingUDPConn).logger) + } + + assert.NoError(t, agentClient.Close()) +} + +func TestNewAgentClientUDPWithParamsReconnectingDisabled(t *testing.T) { + mockServer, err := newUDPListener() + require.NoError(t, err) + defer mockServer.Close() + + agentClient, err := newAgentClientUDP(agentClientUDPParams{ + HostPort: mockServer.LocalAddr().String(), + Logger: nil, + AttemptReconnecting: false, + }) + assert.NoError(t, err) + assert.NotNil(t, agentClient) + assert.Equal(t, udpPacketMaxLength, agentClient.maxPacketSize) + + assert.IsType(t, &net.UDPConn{}, agentClient.connUDP) + + assert.NoError(t, agentClient.Close()) +} diff --git a/exporters/trace/jaeger/go.sum b/exporters/trace/jaeger/go.sum index 338cb3bc0fba..dc3a8e09e61c 100644 --- a/exporters/trace/jaeger/go.sum +++ b/exporters/trace/jaeger/go.sum @@ -115,6 +115,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= diff --git a/exporters/trace/jaeger/reconnecting_udp_client.go b/exporters/trace/jaeger/reconnecting_udp_client.go new file mode 100644 index 000000000000..ab913437f8d1 --- /dev/null +++ b/exporters/trace/jaeger/reconnecting_udp_client.go @@ -0,0 +1,202 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package jaeger + +import ( + "fmt" + "log" + "net" + "sync" + "sync/atomic" + "time" +) + +// reconnectingUDPConn is an implementation of udpConn that resolves hostPort every resolveTimeout, if the resolved address is +// different than the current conn then the new address is dialed and the conn is swapped. +type reconnectingUDPConn struct { + // `sync/atomic` expects the first word in an allocated struct to be 64-bit + // aligned on both ARM and x86-32. See https://goo.gl/zW7dgq for more details. + bufferBytes int64 + hostPort string + resolveFunc resolveFunc + dialFunc dialFunc + logger *log.Logger + + connMtx sync.RWMutex + conn *net.UDPConn + destAddr *net.UDPAddr + closeChan chan struct{} +} + +type resolveFunc func(network string, hostPort string) (*net.UDPAddr, error) +type dialFunc func(network string, laddr, raddr *net.UDPAddr) (*net.UDPConn, error) + +// newReconnectingUDPConn returns a new udpConn that resolves hostPort every resolveTimeout, if the resolved address is +// different than the current conn then the new address is dialed and the conn is swapped. +func newReconnectingUDPConn(hostPort string, bufferBytes int, resolveTimeout time.Duration, resolveFunc resolveFunc, dialFunc dialFunc, logger *log.Logger) (*reconnectingUDPConn, error) { + conn := &reconnectingUDPConn{ + hostPort: hostPort, + resolveFunc: resolveFunc, + dialFunc: dialFunc, + logger: logger, + closeChan: make(chan struct{}), + bufferBytes: int64(bufferBytes), + } + + if err := conn.attemptResolveAndDial(); err != nil { + conn.logf("failed resolving destination address on connection startup, with err: %q. retrying in %s", err.Error(), resolveTimeout) + } + + go conn.reconnectLoop(resolveTimeout) + + return conn, nil +} + +func (c *reconnectingUDPConn) logf(format string, args ...interface{}) { + if c.logger != nil { + c.logger.Printf(format, args...) + } +} + +func (c *reconnectingUDPConn) reconnectLoop(resolveTimeout time.Duration) { + ticker := time.NewTicker(resolveTimeout) + defer ticker.Stop() + + for { + select { + case <-c.closeChan: + return + case <-ticker.C: + if err := c.attemptResolveAndDial(); err != nil { + c.logf("%s", err.Error()) + } + } + } +} + +func (c *reconnectingUDPConn) attemptResolveAndDial() error { + newAddr, err := c.resolveFunc("udp", c.hostPort) + if err != nil { + return fmt.Errorf("failed to resolve new addr for host %q, with err: %w", c.hostPort, err) + } + + c.connMtx.RLock() + curAddr := c.destAddr + c.connMtx.RUnlock() + + // dont attempt dial if an addr was successfully dialed previously and, resolved addr is the same as current conn + if curAddr != nil && newAddr.String() == curAddr.String() { + return nil + } + + if err := c.attemptDialNewAddr(newAddr); err != nil { + return fmt.Errorf("failed to dial newly resolved addr '%s', with err: %w", newAddr, err) + } + + return nil +} + +func (c *reconnectingUDPConn) attemptDialNewAddr(newAddr *net.UDPAddr) error { + connUDP, err := c.dialFunc(newAddr.Network(), nil, newAddr) + if err != nil { + return err + } + + if bufferBytes := int(atomic.LoadInt64(&c.bufferBytes)); bufferBytes != 0 { + if err = connUDP.SetWriteBuffer(bufferBytes); err != nil { + return err + } + } + + c.connMtx.Lock() + c.destAddr = newAddr + // store prev to close later + prevConn := c.conn + c.conn = connUDP + c.connMtx.Unlock() + + if prevConn != nil { + return prevConn.Close() + } + + return nil +} + +// Write calls net.udpConn.Write, if it fails an attempt is made to connect to a new addr, if that succeeds the write is retried before returning +func (c *reconnectingUDPConn) Write(b []byte) (int, error) { + var bytesWritten int + var err error + + c.connMtx.RLock() + conn := c.conn + c.connMtx.RUnlock() + + if conn == nil { + // if connection is not initialized indicate this with err in order to hook into retry logic + err = fmt.Errorf("UDP connection not yet initialized, an address has not been resolved") + } else { + bytesWritten, err = conn.Write(b) + } + + if err == nil { + return bytesWritten, nil + } + + // attempt to resolve and dial new address in case that's the problem, if resolve and dial succeeds, try write again + if reconnErr := c.attemptResolveAndDial(); reconnErr == nil { + c.connMtx.RLock() + conn := c.conn + c.connMtx.RUnlock() + + return conn.Write(b) + } + + // return original error if reconn fails + return bytesWritten, err +} + +// Close stops the reconnectLoop, then closes the connection via net.udpConn 's implementation +func (c *reconnectingUDPConn) Close() error { + close(c.closeChan) + + // acquire rw lock before closing conn to ensure calls to Write drain + c.connMtx.Lock() + defer c.connMtx.Unlock() + + if c.conn != nil { + return c.conn.Close() + } + + return nil +} + +// SetWriteBuffer defers to the net.udpConn SetWriteBuffer implementation wrapped with a RLock. if no conn is currently held +// and SetWriteBuffer is called store bufferBytes to be set for new conns +func (c *reconnectingUDPConn) SetWriteBuffer(bytes int) error { + var err error + + c.connMtx.RLock() + conn := c.conn + c.connMtx.RUnlock() + + if conn != nil { + err = c.conn.SetWriteBuffer(bytes) + } + + if err == nil { + atomic.StoreInt64(&c.bufferBytes, int64(bytes)) + } + + return err +} diff --git a/exporters/trace/jaeger/reconnecting_udp_client_test.go b/exporters/trace/jaeger/reconnecting_udp_client_test.go new file mode 100644 index 000000000000..54060f1446c6 --- /dev/null +++ b/exporters/trace/jaeger/reconnecting_udp_client_test.go @@ -0,0 +1,462 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package jaeger + +import ( + "context" + "fmt" + "math/rand" + "net" + "runtime" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +type mockResolver struct { + mock.Mock +} + +func (m *mockResolver) ResolveUDPAddr(network string, hostPort string) (*net.UDPAddr, error) { + args := m.Called(network, hostPort) + + a0 := args.Get(0) + if a0 == nil { + return (*net.UDPAddr)(nil), args.Error(1) + } + return a0.(*net.UDPAddr), args.Error(1) +} + +type mockDialer struct { + mock.Mock +} + +func (m *mockDialer) DialUDP(network string, laddr, raddr *net.UDPAddr) (*net.UDPConn, error) { + args := m.Called(network, laddr, raddr) + + a0 := args.Get(0) + if a0 == nil { + return (*net.UDPConn)(nil), args.Error(1) + } + + return a0.(*net.UDPConn), args.Error(1) +} + +func newUDPListener() (net.PacketConn, error) { + return net.ListenPacket("udp", "127.0.0.1:0") +} + +func newUDPConn() (net.PacketConn, *net.UDPConn, error) { + mockServer, err := newUDPListener() + if err != nil { + return nil, nil, err + } + + addr, err := net.ResolveUDPAddr("udp", mockServer.LocalAddr().String()) + if err != nil { + mockServer.Close() + return nil, nil, err + } + + conn, err := net.DialUDP("udp", nil, addr) + if err != nil { + mockServer.Close() + return nil, nil, err + } + + return mockServer, conn, nil +} + +func assertSockBufferSize(t *testing.T, expectedBytes int, conn *net.UDPConn) bool { + fd, _ := conn.File() + bufferBytes, _ := syscall.GetsockoptInt(int(fd.Fd()), syscall.SOL_SOCKET, syscall.SO_SNDBUF) + + // The linux kernel doubles SO_SNDBUF value (to allow space for bookkeeping overhead) when it is set using setsockopt(2), and this doubled value is returned by getsockopt(2) + // https://linux.die.net/man/7/socket + if runtime.GOOS == "linux" { + return assert.GreaterOrEqual(t, expectedBytes*2, bufferBytes) + } + + return assert.Equal(t, expectedBytes, bufferBytes) +} + +func assertConnWritable(t *testing.T, conn udpConn, serverConn net.PacketConn) { + expectedString := "yo this is a test" + _, err := conn.Write([]byte(expectedString)) + require.NoError(t, err) + + var buf = make([]byte, len(expectedString)) + err = serverConn.SetReadDeadline(time.Now().Add(time.Second)) + require.NoError(t, err) + + _, _, err = serverConn.ReadFrom(buf) + require.NoError(t, err) + require.Equal(t, []byte(expectedString), buf) +} + +func waitForCallWithTimeout(call *mock.Call) bool { + called := make(chan struct{}) + call.Run(func(args mock.Arguments) { + close(called) + }) + + var wasCalled bool + // wait at most 100 milliseconds for the second call of ResolveUDPAddr that is supposed to fail + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + select { + case <-called: + wasCalled = true + case <-ctx.Done(): + fmt.Println("timed out") + } + cancel() + + return wasCalled +} + +func waitForConnCondition(conn *reconnectingUDPConn, condition func(conn *reconnectingUDPConn) bool) bool { + var conditionVal bool + for i := 0; i < 10; i++ { + conn.connMtx.RLock() + conditionVal = condition(conn) + conn.connMtx.RUnlock() + if conditionVal || i >= 9 { + break + } + + time.Sleep(time.Millisecond * 10) + } + + return conditionVal +} + +func newMockUDPAddr(t *testing.T, port int) *net.UDPAddr { + var buf = make([]byte, 4) + // random is not seeded to ensure tests are deterministic (also doesnt matter if ip is valid) + _, err := rand.Read(buf) + require.NoError(t, err) + + return &net.UDPAddr{ + IP: net.IPv4(buf[0], buf[1], buf[2], buf[3]), + Port: port, + } +} + +func TestNewResolvedUDPConn(t *testing.T) { + hostPort := "blahblah:34322" + + mockServer, clientConn, err := newUDPConn() + require.NoError(t, err) + defer mockServer.Close() + + mockUDPAddr := newMockUDPAddr(t, 34322) + + resolver := mockResolver{} + resolver. + On("ResolveUDPAddr", "udp", hostPort). + Return(mockUDPAddr, nil). + Once() + + dialer := mockDialer{} + dialer. + On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr). + Return(clientConn, nil). + Once() + + conn, err := newReconnectingUDPConn(hostPort, udpPacketMaxLength, time.Hour, resolver.ResolveUDPAddr, dialer.DialUDP, nil) + assert.NoError(t, err) + require.NotNil(t, conn) + + err = conn.Close() + assert.NoError(t, err) + + // assert the actual connection was closed + assert.Error(t, clientConn.Close()) + + resolver.AssertExpectations(t) + dialer.AssertExpectations(t) +} + +func TestResolvedUDPConnWrites(t *testing.T) { + hostPort := "blahblah:34322" + + mockServer, clientConn, err := newUDPConn() + require.NoError(t, err) + defer mockServer.Close() + + mockUDPAddr := newMockUDPAddr(t, 34322) + + resolver := mockResolver{} + resolver. + On("ResolveUDPAddr", "udp", hostPort). + Return(mockUDPAddr, nil). + Once() + + dialer := mockDialer{} + dialer. + On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr). + Return(clientConn, nil). + Once() + + conn, err := newReconnectingUDPConn(hostPort, udpPacketMaxLength, time.Hour, resolver.ResolveUDPAddr, dialer.DialUDP, nil) + assert.NoError(t, err) + require.NotNil(t, conn) + + assertConnWritable(t, conn, mockServer) + + err = conn.Close() + assert.NoError(t, err) + + // assert the actual connection was closed + assert.Error(t, clientConn.Close()) + + resolver.AssertExpectations(t) + dialer.AssertExpectations(t) +} + +func TestResolvedUDPConnEventuallyDials(t *testing.T) { + hostPort := "blahblah:34322" + + mockServer, clientConn, err := newUDPConn() + require.NoError(t, err) + defer mockServer.Close() + + mockUDPAddr := newMockUDPAddr(t, 34322) + + resolver := mockResolver{} + resolver. + On("ResolveUDPAddr", "udp", hostPort). + Return(nil, fmt.Errorf("failed to resolve")).Once(). + On("ResolveUDPAddr", "udp", hostPort). + Return(mockUDPAddr, nil) + + dialer := mockDialer{} + dialCall := dialer. + On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr). + Return(clientConn, nil).Once() + + conn, err := newReconnectingUDPConn(hostPort, udpPacketMaxLength, time.Millisecond*10, resolver.ResolveUDPAddr, dialer.DialUDP, nil) + assert.NoError(t, err) + require.NotNil(t, conn) + + err = conn.SetWriteBuffer(udpPacketMaxLength) + assert.NoError(t, err) + + wasCalled := waitForCallWithTimeout(dialCall) + assert.True(t, wasCalled) + + connEstablished := waitForConnCondition(conn, func(conn *reconnectingUDPConn) bool { + return conn.conn != nil + }) + + assert.True(t, connEstablished) + + assertConnWritable(t, conn, mockServer) + assertSockBufferSize(t, udpPacketMaxLength, clientConn) + + err = conn.Close() + assert.NoError(t, err) + + // assert the actual connection was closed + assert.Error(t, clientConn.Close()) + + resolver.AssertExpectations(t) + dialer.AssertExpectations(t) +} + +func TestResolvedUDPConnNoSwapIfFail(t *testing.T) { + hostPort := "blahblah:34322" + + mockServer, clientConn, err := newUDPConn() + require.NoError(t, err) + defer mockServer.Close() + + mockUDPAddr := newMockUDPAddr(t, 34322) + + resolver := mockResolver{} + resolver. + On("ResolveUDPAddr", "udp", hostPort). + Return(mockUDPAddr, nil).Once() + + failCall := resolver.On("ResolveUDPAddr", "udp", hostPort). + Return(nil, fmt.Errorf("resolve failed")) + + dialer := mockDialer{} + dialer. + On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr). + Return(clientConn, nil).Once() + + conn, err := newReconnectingUDPConn(hostPort, udpPacketMaxLength, time.Millisecond*10, resolver.ResolveUDPAddr, dialer.DialUDP, nil) + assert.NoError(t, err) + require.NotNil(t, conn) + + wasCalled := waitForCallWithTimeout(failCall) + + assert.True(t, wasCalled) + + assertConnWritable(t, conn, mockServer) + + err = conn.Close() + assert.NoError(t, err) + + // assert the actual connection was closed + assert.Error(t, clientConn.Close()) + + resolver.AssertExpectations(t) + dialer.AssertExpectations(t) +} + +func TestResolvedUDPConnWriteRetry(t *testing.T) { + hostPort := "blahblah:34322" + + mockServer, clientConn, err := newUDPConn() + require.NoError(t, err) + defer mockServer.Close() + + mockUDPAddr := newMockUDPAddr(t, 34322) + + resolver := mockResolver{} + resolver. + On("ResolveUDPAddr", "udp", hostPort). + Return(nil, fmt.Errorf("failed to resolve")).Once(). + On("ResolveUDPAddr", "udp", hostPort). + Return(mockUDPAddr, nil).Once() + + dialer := mockDialer{} + dialer. + On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr). + Return(clientConn, nil).Once() + + conn, err := newReconnectingUDPConn(hostPort, udpPacketMaxLength, time.Millisecond*10, resolver.ResolveUDPAddr, dialer.DialUDP, nil) + assert.NoError(t, err) + require.NotNil(t, conn) + + err = conn.SetWriteBuffer(udpPacketMaxLength) + assert.NoError(t, err) + + assertConnWritable(t, conn, mockServer) + assertSockBufferSize(t, udpPacketMaxLength, clientConn) + + err = conn.Close() + assert.NoError(t, err) + + // assert the actual connection was closed + assert.Error(t, clientConn.Close()) + + resolver.AssertExpectations(t) + dialer.AssertExpectations(t) +} + +func TestResolvedUDPConnWriteRetryFails(t *testing.T) { + hostPort := "blahblah:34322" + + resolver := mockResolver{} + resolver. + On("ResolveUDPAddr", "udp", hostPort). + Return(nil, fmt.Errorf("failed to resolve")).Twice() + + dialer := mockDialer{} + + conn, err := newReconnectingUDPConn(hostPort, udpPacketMaxLength, time.Millisecond*10, resolver.ResolveUDPAddr, dialer.DialUDP, nil) + assert.NoError(t, err) + require.NotNil(t, conn) + + err = conn.SetWriteBuffer(udpPacketMaxLength) + assert.NoError(t, err) + + _, err = conn.Write([]byte("yo this is a test")) + + assert.Error(t, err) + + err = conn.Close() + assert.NoError(t, err) + + resolver.AssertExpectations(t) + dialer.AssertExpectations(t) +} + +func TestResolvedUDPConnChanges(t *testing.T) { + hostPort := "blahblah:34322" + + mockServer, clientConn, err := newUDPConn() + require.NoError(t, err) + defer mockServer.Close() + + mockUDPAddr1 := newMockUDPAddr(t, 34322) + + mockServer2, clientConn2, err := newUDPConn() + require.NoError(t, err) + defer mockServer2.Close() + + mockUDPAddr2 := newMockUDPAddr(t, 34322) + + // ensure address doesn't duplicate mockUDPAddr1 + for i := 0; i < 10 && mockUDPAddr2.IP.Equal(mockUDPAddr1.IP); i++ { + mockUDPAddr2 = newMockUDPAddr(t, 34322) + } + + // this is really unlikely to ever fail the test, but its here as a safeguard + require.False(t, mockUDPAddr2.IP.Equal(mockUDPAddr1.IP)) + + resolver := mockResolver{} + resolver. + On("ResolveUDPAddr", "udp", hostPort). + Return(mockUDPAddr1, nil).Once(). + On("ResolveUDPAddr", "udp", hostPort). + Return(mockUDPAddr2, nil) + + dialer := mockDialer{} + dialer. + On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr1). + Return(clientConn, nil).Once() + + secondDial := dialer. + On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr2). + Return(clientConn2, nil).Once() + + conn, err := newReconnectingUDPConn(hostPort, udpPacketMaxLength, time.Millisecond*10, resolver.ResolveUDPAddr, dialer.DialUDP, nil) + assert.NoError(t, err) + require.NotNil(t, conn) + + err = conn.SetWriteBuffer(udpPacketMaxLength) + assert.NoError(t, err) + + wasCalled := waitForCallWithTimeout(secondDial) + assert.True(t, wasCalled) + + connSwapped := waitForConnCondition(conn, func(conn *reconnectingUDPConn) bool { + return conn.conn == clientConn2 + }) + + assert.True(t, connSwapped) + + assertConnWritable(t, conn, mockServer2) + assertSockBufferSize(t, udpPacketMaxLength, clientConn2) + + err = conn.Close() + assert.NoError(t, err) + + // assert the prev connection was closed + assert.Error(t, clientConn.Close()) + + // assert the actual connection was closed + assert.Error(t, clientConn2.Close()) + + resolver.AssertExpectations(t) + dialer.AssertExpectations(t) +} diff --git a/exporters/trace/jaeger/uploader.go b/exporters/trace/jaeger/uploader.go index 3f3f848c55f5..f79678a24449 100644 --- a/exporters/trace/jaeger/uploader.go +++ b/exporters/trace/jaeger/uploader.go @@ -20,7 +20,9 @@ import ( "fmt" "io" "io/ioutil" + "log" "net/http" + "time" "github.com/apache/thrift/lib/go/thrift" @@ -36,13 +38,24 @@ type EndpointOption func() (batchUploader, error) // WithAgentEndpoint instructs exporter to send spans to jaeger-agent at this address. // For example, localhost:6831. -func WithAgentEndpoint(agentEndpoint string) EndpointOption { +func WithAgentEndpoint(agentEndpoint string, options ...AgentEndpointOption) EndpointOption { return func() (batchUploader, error) { if agentEndpoint == "" { return nil, errors.New("agentEndpoint must not be empty") } - client, err := newAgentClientUDP(agentEndpoint, udpPacketMaxLength) + o := &AgentEndpointOptions{ + agentClientUDPParams{ + HostPort: agentEndpoint, + AttemptReconnecting: true, + }, + } + + for _, opt := range options { + opt(o) + } + + client, err := newAgentClientUDP(o.agentClientUDPParams) if err != nil { return nil, err } @@ -51,6 +64,33 @@ func WithAgentEndpoint(agentEndpoint string) EndpointOption { } } +type AgentEndpointOption func(o *AgentEndpointOptions) + +type AgentEndpointOptions struct { + agentClientUDPParams +} + +// WithLogger sets a logger to be used by agent client. +func WithLogger(logger *log.Logger) AgentEndpointOption { + return func(o *AgentEndpointOptions) { + o.Logger = logger + } +} + +// WithDisableAttemptReconnecting sets option to disable reconnecting udp client. +func WithDisableAttemptReconnecting() AgentEndpointOption { + return func(o *AgentEndpointOptions) { + o.AttemptReconnecting = false + } +} + +// WithAttemptReconnectingInterval sets the interval between attempts to re resolve agent endpoint. +func WithAttemptReconnectingInterval(interval time.Duration) AgentEndpointOption { + return func(o *AgentEndpointOptions) { + o.AttemptReconnectInterval = interval + } +} + // WithCollectorEndpoint defines the full url to the Jaeger HTTP Thrift collector. // For example, http://localhost:14268/api/traces func WithCollectorEndpoint(collectorEndpoint string, options ...CollectorEndpointOption) EndpointOption {