diff --git a/Gopkg.lock b/Gopkg.lock index 2a5215a5..387958b1 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -142,10 +142,19 @@ version = "v0.0.5" [[projects]] - digest = "1:0496f0e99014b7fd0a560c539f51d0882731137b85494142f47e550e4657176a" + digest = "1:ac83cf90d08b63ad5f7e020ef480d319ae890c208f8524622a2f3136e2686b02" + name = "github.com/stretchr/objx" + packages = ["."] + pruneopts = "UT" + revision = "477a77ecc69700c7cdeb1fa9e129548e1c1c393c" + version = "v0.1.1" + +[[projects]] + digest = "1:d88ba57c4e8f5db6ce9ab6605a89f4542ee751b576884ba5271c2ba3d4b6f2d2" name = "github.com/stretchr/testify" packages = [ "assert", + "mock", "require", "suite", ] @@ -153,6 +162,42 @@ revision = "221dbe5ed46703ee255b1da0dec05086f5035f62" version = "v1.4.0" +[[projects]] + digest = "1:5b98956718573850caf7e0fd00b571a6657c4ef1f345ddf0c96b43ce355fe862" + name = "github.com/uber/jaeger-client-go" + packages = [ + ".", + "config", + "crossdock/client", + "crossdock/common", + "crossdock/endtoend", + "crossdock/log", + "crossdock/server", + "crossdock/thrift/tracetest", + "internal/baggage", + "internal/baggage/remote", + "internal/reporterstats", + "internal/spanlog", + "internal/throttler", + "internal/throttler/remote", + "log", + "log/zap/mock_opentracing", + "rpcmetrics", + "testutils", + "thrift", + "thrift-gen/agent", + "thrift-gen/baggage", + "thrift-gen/jaeger", + "thrift-gen/sampling", + "thrift-gen/zipkincore", + "transport", + "transport/zipkin", + "utils", + ] + pruneopts = "UT" + revision = "66c008c3d6ad856cac92a0af53186efbffa8e6a5" + version = "v2.24.0" + [[projects]] digest = "1:0ec60ffd594af00ba1660bc746aa0e443d27dd4003dee55f9d08a0b4ff5431a3" name = "github.com/uber/jaeger-lib" @@ -314,8 +359,36 @@ "github.com/pkg/errors", "github.com/prometheus/client_golang/prometheus", "github.com/stretchr/testify/assert", + "github.com/stretchr/testify/mock", "github.com/stretchr/testify/require", "github.com/stretchr/testify/suite", + "github.com/uber/jaeger-client-go", + "github.com/uber/jaeger-client-go/config", + "github.com/uber/jaeger-client-go/crossdock/client", + "github.com/uber/jaeger-client-go/crossdock/common", + "github.com/uber/jaeger-client-go/crossdock/endtoend", + "github.com/uber/jaeger-client-go/crossdock/log", + "github.com/uber/jaeger-client-go/crossdock/server", + "github.com/uber/jaeger-client-go/crossdock/thrift/tracetest", + "github.com/uber/jaeger-client-go/internal/baggage", + "github.com/uber/jaeger-client-go/internal/baggage/remote", + "github.com/uber/jaeger-client-go/internal/reporterstats", + "github.com/uber/jaeger-client-go/internal/spanlog", + "github.com/uber/jaeger-client-go/internal/throttler", + "github.com/uber/jaeger-client-go/internal/throttler/remote", + "github.com/uber/jaeger-client-go/log", + "github.com/uber/jaeger-client-go/log/zap/mock_opentracing", + "github.com/uber/jaeger-client-go/rpcmetrics", + "github.com/uber/jaeger-client-go/testutils", + "github.com/uber/jaeger-client-go/thrift", + "github.com/uber/jaeger-client-go/thrift-gen/agent", + "github.com/uber/jaeger-client-go/thrift-gen/baggage", + "github.com/uber/jaeger-client-go/thrift-gen/jaeger", + "github.com/uber/jaeger-client-go/thrift-gen/sampling", + "github.com/uber/jaeger-client-go/thrift-gen/zipkincore", + "github.com/uber/jaeger-client-go/transport", + "github.com/uber/jaeger-client-go/transport/zipkin", + "github.com/uber/jaeger-client-go/utils", "github.com/uber/jaeger-lib/metrics", "github.com/uber/jaeger-lib/metrics/metricstest", "github.com/uber/jaeger-lib/metrics/prometheus", diff --git a/README.md b/README.md index e7b13b1c..687f5780 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,8 @@ JAEGER_PASSWORD | Password to send as part of "Basic" authentication to the coll JAEGER_REPORTER_LOG_SPANS | Whether the reporter should also log the spans" `true` or `false` (default `false`). JAEGER_REPORTER_MAX_QUEUE_SIZE | The reporter's maximum queue size (default `100`). JAEGER_REPORTER_FLUSH_INTERVAL | The reporter's flush interval, with units, e.g. `500ms` or `2s` ([valid units][timeunits]; default `1s`). +JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED | When true, disables udp connection helper that periodically re-resolves the agent's hostname and reconnects if there was a change (default `false`). +JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL | Controls how often the agent client re-resolves the provided hostname in order to detect address changes ([valid units][timeunits]; default `30s`). JAEGER_SAMPLER_TYPE | The sampler type: `remote`, `const`, `probabilistic`, `ratelimiting` (default `remote`). See also https://www.jaegertracing.io/docs/latest/sampling/. JAEGER_SAMPLER_PARAM | The sampler parameter (number). JAEGER_SAMPLER_MANAGER_HOST_PORT | (deprecated) The HTTP endpoint when using the `remote` sampler. diff --git a/config/config.go b/config/config.go index e6ffb987..bb122829 100644 --- a/config/config.go +++ b/config/config.go @@ -22,6 +22,7 @@ import ( "time" "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go/utils" "github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go/internal/baggage/remote" @@ -124,6 +125,17 @@ type ReporterConfig struct { // Can be provided by FromEnv() via the environment variable named JAEGER_AGENT_HOST / JAEGER_AGENT_PORT LocalAgentHostPort string `yaml:"localAgentHostPort"` + // DisableAttemptReconnecting when true, disables udp connection helper that periodically re-resolves + // the agent's hostname and reconnects if there was a change. This option only + // applies if LocalAgentHostPort is specified. + // Can be provided by FromEnv() via the environment variable named JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED + DisableAttemptReconnecting bool `yaml:"disableAttemptReconnecting"` + + // AttemptReconnectInterval controls how often the agent client re-resolves the provided hostname + // in order to detect address changes. This option only applies if DisableAttemptReconnecting is false. + // Can be provided by FromEnv() via the environment variable named JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL + AttemptReconnectInterval time.Duration + // CollectorEndpoint instructs reporter to send spans to jaeger-collector at this URL. // Can be provided by FromEnv() via the environment variable named JAEGER_ENDPOINT CollectorEndpoint string `yaml:"collectorEndpoint"` @@ -384,7 +396,7 @@ func (rc *ReporterConfig) NewReporter( metrics *jaeger.Metrics, logger jaeger.Logger, ) (jaeger.Reporter, error) { - sender, err := rc.newTransport() + sender, err := rc.newTransport(logger) if err != nil { return nil, err } @@ -401,7 +413,7 @@ func (rc *ReporterConfig) NewReporter( return reporter, err } -func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) { +func (rc *ReporterConfig) newTransport(logger jaeger.Logger) (jaeger.Transport, error) { switch { case rc.CollectorEndpoint != "": httpOptions := []transport.HTTPOption{transport.HTTPBatchSize(1), transport.HTTPHeaders(rc.HTTPHeaders)} @@ -410,6 +422,13 @@ func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) { } return transport.NewHTTPTransport(rc.CollectorEndpoint, httpOptions...), nil default: - return jaeger.NewUDPTransport(rc.LocalAgentHostPort, 0) + return jaeger.NewUDPTransportWithParams(jaeger.UDPTransportParams{ + AgentClientUDPParams: utils.AgentClientUDPParams{ + HostPort: rc.LocalAgentHostPort, + Logger: logger, + DisableAttemptReconnecting: rc.DisableAttemptReconnecting, + AttemptReconnectInterval: rc.AttemptReconnectInterval, + }, + }) } } diff --git a/config/config_env.go b/config/config_env.go index f38eb9d9..92d60cd5 100644 --- a/config/config_env.go +++ b/config/config_env.go @@ -24,30 +24,31 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" - "github.com/uber/jaeger-client-go" ) const ( // environment variable names - envServiceName = "JAEGER_SERVICE_NAME" - envDisabled = "JAEGER_DISABLED" - envRPCMetrics = "JAEGER_RPC_METRICS" - envTags = "JAEGER_TAGS" - envSamplerType = "JAEGER_SAMPLER_TYPE" - envSamplerParam = "JAEGER_SAMPLER_PARAM" - envSamplerManagerHostPort = "JAEGER_SAMPLER_MANAGER_HOST_PORT" // Deprecated by envSamplingEndpoint - envSamplingEndpoint = "JAEGER_SAMPLING_ENDPOINT" - envSamplerMaxOperations = "JAEGER_SAMPLER_MAX_OPERATIONS" - envSamplerRefreshInterval = "JAEGER_SAMPLER_REFRESH_INTERVAL" - envReporterMaxQueueSize = "JAEGER_REPORTER_MAX_QUEUE_SIZE" - envReporterFlushInterval = "JAEGER_REPORTER_FLUSH_INTERVAL" - envReporterLogSpans = "JAEGER_REPORTER_LOG_SPANS" - envEndpoint = "JAEGER_ENDPOINT" - envUser = "JAEGER_USER" - envPassword = "JAEGER_PASSWORD" - envAgentHost = "JAEGER_AGENT_HOST" - envAgentPort = "JAEGER_AGENT_PORT" + envServiceName = "JAEGER_SERVICE_NAME" + envDisabled = "JAEGER_DISABLED" + envRPCMetrics = "JAEGER_RPC_METRICS" + envTags = "JAEGER_TAGS" + envSamplerType = "JAEGER_SAMPLER_TYPE" + envSamplerParam = "JAEGER_SAMPLER_PARAM" + envSamplerManagerHostPort = "JAEGER_SAMPLER_MANAGER_HOST_PORT" // Deprecated by envSamplingEndpoint + envSamplingEndpoint = "JAEGER_SAMPLING_ENDPOINT" + envSamplerMaxOperations = "JAEGER_SAMPLER_MAX_OPERATIONS" + envSamplerRefreshInterval = "JAEGER_SAMPLER_REFRESH_INTERVAL" + envReporterMaxQueueSize = "JAEGER_REPORTER_MAX_QUEUE_SIZE" + envReporterFlushInterval = "JAEGER_REPORTER_FLUSH_INTERVAL" + envReporterLogSpans = "JAEGER_REPORTER_LOG_SPANS" + envReporterAttemptReconnectingDisabled = "JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED" + envReporterAttemptReconnectInterval = "JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL" + envEndpoint = "JAEGER_ENDPOINT" + envUser = "JAEGER_USER" + envPassword = "JAEGER_PASSWORD" + envAgentHost = "JAEGER_AGENT_HOST" + envAgentPort = "JAEGER_AGENT_PORT" ) // FromEnv uses environment variables to set the tracer's Configuration @@ -206,6 +207,24 @@ func (rc *ReporterConfig) reporterConfigFromEnv() (*ReporterConfig, error) { if useEnv || rc.LocalAgentHostPort == "" { rc.LocalAgentHostPort = fmt.Sprintf("%s:%d", host, port) } + + if e := os.Getenv(envReporterAttemptReconnectingDisabled); e != "" { + if value, err := strconv.ParseBool(e); err == nil { + rc.DisableAttemptReconnecting = value + } else { + return nil, errors.Wrapf(err, "cannot parse env var %s=%s", envReporterAttemptReconnectingDisabled, e) + } + } + + if !rc.DisableAttemptReconnecting { + if e := os.Getenv(envReporterAttemptReconnectInterval); e != "" { + if value, err := time.ParseDuration(e); err == nil { + rc.AttemptReconnectInterval = value + } else { + return nil, errors.Wrapf(err, "cannot parse env var %s=%s", envReporterAttemptReconnectInterval, e) + } + } + } } return rc, nil diff --git a/config/config_test.go b/config/config_test.go index c273d5c8..6f97d0a6 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -202,6 +202,8 @@ func TestReporter(t *testing.T) { setEnv(t, envAgentPort, "6832") setEnv(t, envUser, "user") setEnv(t, envPassword, "password") + setEnv(t, envReporterAttemptReconnectingDisabled, "false") + setEnv(t, envReporterAttemptReconnectInterval, "40s") // Existing ReporterConfig data rc := ReporterConfig{ @@ -225,6 +227,8 @@ func TestReporter(t *testing.T) { assert.Equal(t, "nonlocalhost:6832", cfg.LocalAgentHostPort) assert.Equal(t, "user01", cfg.User) assert.Equal(t, "password01", cfg.Password) + assert.Equal(t, false, cfg.DisableAttemptReconnecting) + assert.Equal(t, time.Second*40, cfg.AttemptReconnectInterval) // Prepare setEnv(t, envEndpoint, "http://1.2.3.4:5678/api/traces") @@ -561,7 +565,7 @@ func TestInvalidSamplerType(t *testing.T) { func TestUDPTransportType(t *testing.T) { rc := &ReporterConfig{LocalAgentHostPort: "localhost:1234"} expect, _ := jaeger.NewUDPTransport(rc.LocalAgentHostPort, 0) - sender, err := rc.newTransport() + sender, err := rc.newTransport(log.NullLogger) require.NoError(t, err) require.IsType(t, expect, sender) } @@ -569,7 +573,7 @@ func TestUDPTransportType(t *testing.T) { func TestHTTPTransportType(t *testing.T) { rc := &ReporterConfig{CollectorEndpoint: "http://1.2.3.4:5678/api/traces"} expect := transport.NewHTTPTransport(rc.CollectorEndpoint) - sender, err := rc.newTransport() + sender, err := rc.newTransport(log.NullLogger) require.NoError(t, err) require.IsType(t, expect, sender) } @@ -581,7 +585,7 @@ func TestHTTPTransportTypeWithAuth(t *testing.T) { Password: "auth_pass", } expect := transport.NewHTTPTransport(rc.CollectorEndpoint) - sender, err := rc.newTransport() + sender, err := rc.newTransport(log.NullLogger) require.NoError(t, err) require.IsType(t, expect, sender) } diff --git a/transport/http_test.go b/transport/http_test.go index 3bcf1bac..b42b5f9e 100644 --- a/transport/http_test.go +++ b/transport/http_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/uber/jaeger-client-go/thrift" "github.com/uber/jaeger-client-go" @@ -167,7 +168,9 @@ func newHTTPServer(t *testing.T) *httpServer { }) go func() { - http.ListenAndServe(":10001", nil) + if err := http.ListenAndServe(":10001", nil); err != nil && err != http.ErrServerClosed { + require.NoError(t, err) + } }() return server diff --git a/transport_udp.go b/transport_udp.go index 7370d800..5734819a 100644 --- a/transport_udp.go +++ b/transport_udp.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/uber/jaeger-client-go/internal/reporterstats" + "github.com/uber/jaeger-client-go/log" "github.com/uber/jaeger-client-go/thrift" j "github.com/uber/jaeger-client-go/thrift-gen/jaeger" "github.com/uber/jaeger-client-go/utils" @@ -57,35 +58,57 @@ type udpSender struct { failedToEmitSpans int64 } -// NewUDPTransport creates a reporter that submits spans to jaeger-agent. +// UDPTransportParams allows specifying options for initializing a UDPTransport. An instance of this struct should +// be passed to NewUDPTransportWithParams. +type UDPTransportParams struct { + utils.AgentClientUDPParams +} + +// NewUDPTransportWithParams creates a reporter that submits spans to jaeger-agent. // TODO: (breaking change) move to transport/ package. -func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) { - if len(hostPort) == 0 { - hostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort) +func NewUDPTransportWithParams(params UDPTransportParams) (Transport, error) { + if len(params.HostPort) == 0 { + params.HostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort) } - if maxPacketSize == 0 { - maxPacketSize = utils.UDPPacketMaxLength + + if params.Logger == nil { + params.Logger = log.StdLogger + } + + if params.MaxPacketSize == 0 { + params.MaxPacketSize = utils.UDPPacketMaxLength } protocolFactory := thrift.NewTCompactProtocolFactory() // Each span is first written to thriftBuffer to determine its size in bytes. - thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize) + thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize) thriftProtocol := protocolFactory.GetProtocol(thriftBuffer) - client, err := utils.NewAgentClientUDP(hostPort, maxPacketSize) + client, err := utils.NewAgentClientUDPWithParams(params.AgentClientUDPParams) if err != nil { return nil, err } return &udpSender{ client: client, - maxSpanBytes: maxPacketSize - emitBatchOverhead, + maxSpanBytes: params.MaxPacketSize - emitBatchOverhead, thriftBuffer: thriftBuffer, thriftProtocol: thriftProtocol, }, nil } +// NewUDPTransport creates a reporter that submits spans to jaeger-agent. +// TODO: (breaking change) move to transport/ package. +func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) { + return NewUDPTransportWithParams(UDPTransportParams{ + AgentClientUDPParams: utils.AgentClientUDPParams{ + HostPort: hostPort, + MaxPacketSize: maxPacketSize, + }, + }) +} + // SetReporterStats implements reporterstats.Receiver. func (s *udpSender) SetReporterStats(rs reporterstats.ReporterStats) { s.reporterStats = rs diff --git a/utils/reconnecting_udp_conn.go b/utils/reconnecting_udp_conn.go new file mode 100644 index 00000000..0dffc7fa --- /dev/null +++ b/utils/reconnecting_udp_conn.go @@ -0,0 +1,189 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "fmt" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/uber/jaeger-client-go/log" +) + +// reconnectingUDPConn is an implementation of udpConn that resolves hostPort every resolveTimeout, if the resolved address is +// different than the current conn then the new address is dialed and the conn is swapped. +type reconnectingUDPConn struct { + hostPort string + resolveFunc resolveFunc + dialFunc dialFunc + logger log.Logger + bufferBytes int64 + + connMtx sync.RWMutex + conn *net.UDPConn + destAddr *net.UDPAddr + closeChan chan struct{} +} + +type resolveFunc func(network string, hostPort string) (*net.UDPAddr, error) +type dialFunc func(network string, laddr, raddr *net.UDPAddr) (*net.UDPConn, error) + +// newReconnectingUDPConn returns a new udpConn that resolves hostPort every resolveTimeout, if the resolved address is +// different than the current conn then the new address is dialed and the conn is swapped. +func newReconnectingUDPConn(hostPort string, resolveTimeout time.Duration, resolveFunc resolveFunc, dialFunc dialFunc, logger log.Logger) (*reconnectingUDPConn, error) { + conn := &reconnectingUDPConn{ + hostPort: hostPort, + resolveFunc: resolveFunc, + dialFunc: dialFunc, + logger: logger, + closeChan: make(chan struct{}), + } + + if err := conn.attemptResolveAndDial(); err != nil { + logger.Error(fmt.Sprintf("failed resolving destination address on connection startup, with err: %q. retrying in %s", err.Error(), resolveTimeout)) + } + + go conn.reconnectLoop(resolveTimeout) + + return conn, nil +} + +func (c *reconnectingUDPConn) reconnectLoop(resolveTimeout time.Duration) { + ticker := time.NewTicker(resolveTimeout) + defer ticker.Stop() + + for { + select { + case <-c.closeChan: + return + case <-ticker.C: + if err := c.attemptResolveAndDial(); err != nil { + c.logger.Error(err.Error()) + } + } + } +} + +func (c *reconnectingUDPConn) attemptResolveAndDial() error { + newAddr, err := c.resolveFunc("udp", c.hostPort) + if err != nil { + return fmt.Errorf("failed to resolve new addr for host %q, with err: %w", c.hostPort, err) + } + + c.connMtx.RLock() + curAddr := c.destAddr + c.connMtx.RUnlock() + + // dont attempt dial if an addr was successfully dialed previously and, resolved addr is the same as current conn + if curAddr != nil && newAddr.String() == curAddr.String() { + return nil + } + + if err := c.attemptDialNewAddr(newAddr); err != nil { + return fmt.Errorf("failed to dial newly resolved addr '%s', with err: %w", newAddr, err) + } + + return nil +} + +func (c *reconnectingUDPConn) attemptDialNewAddr(newAddr *net.UDPAddr) error { + connUDP, err := c.dialFunc(newAddr.Network(), nil, newAddr) + if err != nil { + return err + } + + if bufferBytes := int(atomic.LoadInt64(&c.bufferBytes)); bufferBytes != 0 { + if err = connUDP.SetWriteBuffer(bufferBytes); err != nil { + return err + } + } + + c.connMtx.Lock() + c.destAddr = newAddr + // store prev to close later + prevConn := c.conn + c.conn = connUDP + c.connMtx.Unlock() + + if prevConn != nil { + return prevConn.Close() + } + + return nil +} + +// Write calls net.udpConn.Write, if it fails an attempt is made to connect to a new addr, if that succeeds the write is retried before returning +func (c *reconnectingUDPConn) Write(b []byte) (int, error) { + var bytesWritten int + var err error + + c.connMtx.RLock() + if c.conn == nil { + // if connection is not initialized indicate this with err in order to hook into retry logic + err = fmt.Errorf("UDP connection not yet initialized, an address has not been resolved") + } else { + bytesWritten, err = c.conn.Write(b) + } + c.connMtx.RUnlock() + + if err == nil { + return bytesWritten, nil + } + + // attempt to resolve and dial new address in case that's the problem, if resolve and dial succeeds, try write again + if reconnErr := c.attemptResolveAndDial(); reconnErr == nil { + c.connMtx.RLock() + defer c.connMtx.RUnlock() + return c.conn.Write(b) + } + + // return original error if reconn fails + return bytesWritten, err +} + +// Close stops the reconnectLoop, then closes the connection via net.udpConn 's implementation +func (c *reconnectingUDPConn) Close() error { + close(c.closeChan) + + // acquire rw lock before closing conn to ensure calls to Write drain + c.connMtx.Lock() + defer c.connMtx.Unlock() + + if c.conn != nil { + return c.conn.Close() + } + + return nil +} + +// SetWriteBuffer defers to the net.udpConn SetWriteBuffer implementation wrapped with a RLock. if no conn is currently held +// and SetWriteBuffer is called store bufferBytes to be set for new conns +func (c *reconnectingUDPConn) SetWriteBuffer(bytes int) error { + var err error + + c.connMtx.RLock() + if c.conn != nil { + err = c.conn.SetWriteBuffer(bytes) + } + c.connMtx.RUnlock() + + if err == nil { + atomic.StoreInt64(&c.bufferBytes, int64(bytes)) + } + + return err +} diff --git a/utils/reconnecting_udp_conn_test.go b/utils/reconnecting_udp_conn_test.go new file mode 100644 index 00000000..62817430 --- /dev/null +++ b/utils/reconnecting_udp_conn_test.go @@ -0,0 +1,468 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "context" + "fmt" + "math/rand" + "net" + "runtime" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/uber/jaeger-client-go/log" +) + +type mockResolver struct { + mock.Mock +} + +func (m *mockResolver) ResolveUDPAddr(network string, hostPort string) (*net.UDPAddr, error) { + args := m.Called(network, hostPort) + + a0 := args.Get(0) + if a0 == nil { + return (*net.UDPAddr)(nil), args.Error(1) + } + return a0.(*net.UDPAddr), args.Error(1) +} + +type mockDialer struct { + mock.Mock +} + +func (m *mockDialer) DialUDP(network string, laddr, raddr *net.UDPAddr) (*net.UDPConn, error) { + args := m.Called(network, laddr, raddr) + + a0 := args.Get(0) + if a0 == nil { + return (*net.UDPConn)(nil), args.Error(1) + } + + return a0.(*net.UDPConn), args.Error(1) +} + +func newUDPListener() (net.PacketConn, error) { + return net.ListenPacket("udp", "127.0.0.1:0") +} + +func newUDPListenerOnPort(port int) (net.PacketConn, error) { + return net.ListenPacket("udp", fmt.Sprintf("127.0.0.1:%d", port)) +} + +func newUDPConn() (net.PacketConn, *net.UDPConn, error) { + mockServer, err := newUDPListener() + if err != nil { + return nil, nil, err + } + + addr, err := net.ResolveUDPAddr("udp", mockServer.LocalAddr().String()) + if err != nil { + mockServer.Close() + return nil, nil, err + } + + conn, err := net.DialUDP("udp", nil, addr) + if err != nil { + mockServer.Close() + return nil, nil, err + } + + return mockServer, conn, nil +} + +func assertSockBufferSize(t *testing.T, expectedBytes int, conn *net.UDPConn) bool { + fd, _ := conn.File() + bufferBytes, _ := syscall.GetsockoptInt(int(fd.Fd()), syscall.SOL_SOCKET, syscall.SO_SNDBUF) + + // The linux kernel doubles SO_SNDBUF value (to allow space for bookkeeping overhead) when it is set using setsockopt(2), and this doubled value is returned by getsockopt(2) + // https://linux.die.net/man/7/socket + if runtime.GOOS == "linux" { + return assert.GreaterOrEqual(t, expectedBytes*2, bufferBytes) + } + + return assert.Equal(t, expectedBytes, bufferBytes) +} + +func assertConnWritable(t *testing.T, conn udpConn, serverConn net.PacketConn) { + expectedString := "yo this is a test" + _, err := conn.Write([]byte(expectedString)) + require.NoError(t, err) + + var buf = make([]byte, len(expectedString)) + err = serverConn.SetReadDeadline(time.Now().Add(time.Second)) + require.NoError(t, err) + + _, _, err = serverConn.ReadFrom(buf) + require.NoError(t, err) + require.Equal(t, []byte(expectedString), buf) +} + +func waitForCallWithTimeout(call *mock.Call) bool { + called := make(chan struct{}) + call.Run(func(args mock.Arguments) { + close(called) + }) + + var wasCalled bool + // wait at most 100 milliseconds for the second call of ResolveUDPAddr that is supposed to fail + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + select { + case <-called: + wasCalled = true + case <-ctx.Done(): + fmt.Println("timed out") + } + cancel() + + return wasCalled +} + +func waitForConnCondition(conn *reconnectingUDPConn, condition func(conn *reconnectingUDPConn) bool) bool { + var conditionVal bool + for i := 0; i < 10; i++ { + conn.connMtx.RLock() + conditionVal = condition(conn) + conn.connMtx.RUnlock() + if conditionVal || i >= 9 { + break + } + + time.Sleep(time.Millisecond * 10) + } + + return conditionVal +} + +func newMockUDPAddr(t *testing.T, port int) *net.UDPAddr { + var buf = make([]byte, 4) + // random is not seeded to ensure tests are deterministic (also doesnt matter if ip is valid) + _, err := rand.Read(buf) + require.NoError(t, err) + + return &net.UDPAddr{ + IP: net.IPv4(buf[0], buf[1], buf[2], buf[3]), + Port: port, + } +} + +func TestNewResolvedUDPConn(t *testing.T) { + hostPort := "blahblah:34322" + + mockServer, clientConn, err := newUDPConn() + require.NoError(t, err) + defer mockServer.Close() + + mockUDPAddr := newMockUDPAddr(t, 34322) + + resolver := mockResolver{} + resolver. + On("ResolveUDPAddr", "udp", hostPort). + Return(mockUDPAddr, nil). + Once() + + dialer := mockDialer{} + dialer. + On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr). + Return(clientConn, nil). + Once() + + conn, err := newReconnectingUDPConn(hostPort, time.Hour, resolver.ResolveUDPAddr, dialer.DialUDP, log.NullLogger) + assert.NoError(t, err) + require.NotNil(t, conn) + + err = conn.Close() + assert.NoError(t, err) + + // assert the actual connection was closed + assert.Error(t, clientConn.Close()) + + resolver.AssertExpectations(t) + dialer.AssertExpectations(t) +} + +func TestResolvedUDPConnWrites(t *testing.T) { + hostPort := "blahblah:34322" + + mockServer, clientConn, err := newUDPConn() + require.NoError(t, err) + defer mockServer.Close() + + mockUDPAddr := newMockUDPAddr(t, 34322) + + resolver := mockResolver{} + resolver. + On("ResolveUDPAddr", "udp", hostPort). + Return(mockUDPAddr, nil). + Once() + + dialer := mockDialer{} + dialer. + On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr). + Return(clientConn, nil). + Once() + + conn, err := newReconnectingUDPConn(hostPort, time.Hour, resolver.ResolveUDPAddr, dialer.DialUDP, log.NullLogger) + assert.NoError(t, err) + require.NotNil(t, conn) + + assertConnWritable(t, conn, mockServer) + + err = conn.Close() + assert.NoError(t, err) + + // assert the actual connection was closed + assert.Error(t, clientConn.Close()) + + resolver.AssertExpectations(t) + dialer.AssertExpectations(t) +} + +func TestResolvedUDPConnEventuallyDials(t *testing.T) { + hostPort := "blahblah:34322" + + mockServer, clientConn, err := newUDPConn() + require.NoError(t, err) + defer mockServer.Close() + + mockUDPAddr := newMockUDPAddr(t, 34322) + + resolver := mockResolver{} + resolver. + On("ResolveUDPAddr", "udp", hostPort). + Return(nil, fmt.Errorf("failed to resolve")).Once(). + On("ResolveUDPAddr", "udp", hostPort). + Return(mockUDPAddr, nil) + + dialer := mockDialer{} + dialCall := dialer. + On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr). + Return(clientConn, nil).Once() + + conn, err := newReconnectingUDPConn(hostPort, time.Millisecond*10, resolver.ResolveUDPAddr, dialer.DialUDP, log.NullLogger) + assert.NoError(t, err) + require.NotNil(t, conn) + + err = conn.SetWriteBuffer(UDPPacketMaxLength) + assert.NoError(t, err) + + wasCalled := waitForCallWithTimeout(dialCall) + assert.True(t, wasCalled) + + connEstablished := waitForConnCondition(conn, func(conn *reconnectingUDPConn) bool { + return conn.conn != nil + }) + + assert.True(t, connEstablished) + + assertConnWritable(t, conn, mockServer) + assertSockBufferSize(t, UDPPacketMaxLength, clientConn) + + err = conn.Close() + assert.NoError(t, err) + + // assert the actual connection was closed + assert.Error(t, clientConn.Close()) + + resolver.AssertExpectations(t) + dialer.AssertExpectations(t) +} + +func TestResolvedUDPConnNoSwapIfFail(t *testing.T) { + hostPort := "blahblah:34322" + + mockServer, clientConn, err := newUDPConn() + require.NoError(t, err) + defer mockServer.Close() + + mockUDPAddr := newMockUDPAddr(t, 34322) + + resolver := mockResolver{} + resolver. + On("ResolveUDPAddr", "udp", hostPort). + Return(mockUDPAddr, nil).Once() + + failCall := resolver.On("ResolveUDPAddr", "udp", hostPort). + Return(nil, fmt.Errorf("resolve failed")) + + dialer := mockDialer{} + dialer. + On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr). + Return(clientConn, nil).Once() + + conn, err := newReconnectingUDPConn(hostPort, time.Millisecond*10, resolver.ResolveUDPAddr, dialer.DialUDP, log.NullLogger) + assert.NoError(t, err) + require.NotNil(t, conn) + + wasCalled := waitForCallWithTimeout(failCall) + + assert.True(t, wasCalled) + + assertConnWritable(t, conn, mockServer) + + err = conn.Close() + assert.NoError(t, err) + + // assert the actual connection was closed + assert.Error(t, clientConn.Close()) + + resolver.AssertExpectations(t) + dialer.AssertExpectations(t) +} + +func TestResolvedUDPConnWriteRetry(t *testing.T) { + hostPort := "blahblah:34322" + + mockServer, clientConn, err := newUDPConn() + require.NoError(t, err) + defer mockServer.Close() + + mockUDPAddr := newMockUDPAddr(t, 34322) + + resolver := mockResolver{} + resolver. + On("ResolveUDPAddr", "udp", hostPort). + Return(nil, fmt.Errorf("failed to resolve")).Once(). + On("ResolveUDPAddr", "udp", hostPort). + Return(mockUDPAddr, nil).Once() + + dialer := mockDialer{} + dialer. + On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr). + Return(clientConn, nil).Once() + + conn, err := newReconnectingUDPConn(hostPort, time.Millisecond*10, resolver.ResolveUDPAddr, dialer.DialUDP, log.NullLogger) + assert.NoError(t, err) + require.NotNil(t, conn) + + err = conn.SetWriteBuffer(UDPPacketMaxLength) + assert.NoError(t, err) + + assertConnWritable(t, conn, mockServer) + assertSockBufferSize(t, UDPPacketMaxLength, clientConn) + + err = conn.Close() + assert.NoError(t, err) + + // assert the actual connection was closed + assert.Error(t, clientConn.Close()) + + resolver.AssertExpectations(t) + dialer.AssertExpectations(t) +} + +func TestResolvedUDPConnWriteRetryFails(t *testing.T) { + hostPort := "blahblah:34322" + + resolver := mockResolver{} + resolver. + On("ResolveUDPAddr", "udp", hostPort). + Return(nil, fmt.Errorf("failed to resolve")).Twice() + + dialer := mockDialer{} + + conn, err := newReconnectingUDPConn(hostPort, time.Millisecond*10, resolver.ResolveUDPAddr, dialer.DialUDP, log.NullLogger) + assert.NoError(t, err) + require.NotNil(t, conn) + + err = conn.SetWriteBuffer(UDPPacketMaxLength) + assert.NoError(t, err) + + _, err = conn.Write([]byte("yo this is a test")) + + assert.Error(t, err) + + err = conn.Close() + assert.NoError(t, err) + + resolver.AssertExpectations(t) + dialer.AssertExpectations(t) +} + +func TestResolvedUDPConnChanges(t *testing.T) { + hostPort := "blahblah:34322" + + mockServer, clientConn, err := newUDPConn() + require.NoError(t, err) + defer mockServer.Close() + + mockUDPAddr1 := newMockUDPAddr(t, 34322) + + mockServer2, clientConn2, err := newUDPConn() + require.NoError(t, err) + defer mockServer2.Close() + + mockUDPAddr2 := newMockUDPAddr(t, 34322) + + // ensure address doesn't duplicate mockUDPAddr1 + for i := 0; i < 10 && mockUDPAddr2.IP.Equal(mockUDPAddr1.IP); i++ { + mockUDPAddr2 = newMockUDPAddr(t, 34322) + } + + // this is really unlikely to ever fail the test, but its here as a safeguard + require.False(t, mockUDPAddr2.IP.Equal(mockUDPAddr1.IP)) + + resolver := mockResolver{} + resolver. + On("ResolveUDPAddr", "udp", hostPort). + Return(mockUDPAddr1, nil).Once(). + On("ResolveUDPAddr", "udp", hostPort). + Return(mockUDPAddr2, nil) + + dialer := mockDialer{} + dialer. + On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr1). + Return(clientConn, nil).Once() + + secondDial := dialer. + On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr2). + Return(clientConn2, nil).Once() + + conn, err := newReconnectingUDPConn(hostPort, time.Millisecond*10, resolver.ResolveUDPAddr, dialer.DialUDP, log.NullLogger) + assert.NoError(t, err) + require.NotNil(t, conn) + + err = conn.SetWriteBuffer(UDPPacketMaxLength) + assert.NoError(t, err) + + wasCalled := waitForCallWithTimeout(secondDial) + assert.True(t, wasCalled) + + connSwapped := waitForConnCondition(conn, func(conn *reconnectingUDPConn) bool { + return conn.conn == clientConn2 + }) + + assert.True(t, connSwapped) + + assertConnWritable(t, conn, mockServer2) + assertSockBufferSize(t, UDPPacketMaxLength, clientConn2) + + err = conn.Close() + assert.NoError(t, err) + + // assert the prev connection was closed + assert.Error(t, clientConn.Close()) + + // assert the actual connection was closed + assert.Error(t, clientConn2.Close()) + + resolver.AssertExpectations(t) + dialer.AssertExpectations(t) +} diff --git a/utils/udp_client.go b/utils/udp_client.go index fadd73e4..2352643c 100644 --- a/utils/udp_client.go +++ b/utils/udp_client.go @@ -19,7 +19,9 @@ import ( "fmt" "io" "net" + "time" + "github.com/uber/jaeger-client-go/log" "github.com/uber/jaeger-client-go/thrift" "github.com/uber/jaeger-client-go/thrift-gen/agent" @@ -35,41 +37,90 @@ type AgentClientUDP struct { agent.Agent io.Closer - connUDP *net.UDPConn + connUDP udpConn client *agent.AgentClient maxPacketSize int // max size of datagram in bytes thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span } -// NewAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP. -func NewAgentClientUDP(hostPort string, maxPacketSize int) (*AgentClientUDP, error) { - if maxPacketSize == 0 { - maxPacketSize = UDPPacketMaxLength +type udpConn interface { + Write([]byte) (int, error) + SetWriteBuffer(int) error + Close() error +} + +// AgentClientUDPParams allows specifying options for initializing an AgentClientUDP. An instance of this struct should +// be passed to NewAgentClientUDPWithParams. +type AgentClientUDPParams struct { + HostPort string + MaxPacketSize int + Logger log.Logger + DisableAttemptReconnecting bool + AttemptReconnectInterval time.Duration +} + +// NewAgentClientUDPWithParams creates a client that sends spans to Jaeger Agent over UDP. +func NewAgentClientUDPWithParams(params AgentClientUDPParams) (*AgentClientUDP, error) { + // validate hostport + if _, _, err := net.SplitHostPort(params.HostPort); err != nil { + return nil, err + } + + if params.MaxPacketSize == 0 { + params.MaxPacketSize = UDPPacketMaxLength + } + + if params.Logger == nil { + params.Logger = log.StdLogger } - thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize) + if !params.DisableAttemptReconnecting && params.AttemptReconnectInterval == 0 { + params.AttemptReconnectInterval = time.Second * 30 + } + + thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize) protocolFactory := thrift.NewTCompactProtocolFactory() client := agent.NewAgentClientFactory(thriftBuffer, protocolFactory) - destAddr, err := net.ResolveUDPAddr("udp", hostPort) - if err != nil { - return nil, err - } + var connUDP udpConn + var err error - connUDP, err := net.DialUDP(destAddr.Network(), nil, destAddr) - if err != nil { - return nil, err + if params.DisableAttemptReconnecting { + destAddr, err := net.ResolveUDPAddr("udp", params.HostPort) + if err != nil { + return nil, err + } + + connUDP, err = net.DialUDP(destAddr.Network(), nil, destAddr) + if err != nil { + return nil, err + } + } else { + // host is hostname, setup resolver loop in case host record changes during operation + connUDP, err = newReconnectingUDPConn(params.HostPort, params.AttemptReconnectInterval, net.ResolveUDPAddr, net.DialUDP, params.Logger) + if err != nil { + return nil, err + } } - if err := connUDP.SetWriteBuffer(maxPacketSize); err != nil { + + if err := connUDP.SetWriteBuffer(params.MaxPacketSize); err != nil { return nil, err } - clientUDP := &AgentClientUDP{ + return &AgentClientUDP{ connUDP: connUDP, client: client, - maxPacketSize: maxPacketSize, - thriftBuffer: thriftBuffer} - return clientUDP, nil + maxPacketSize: params.MaxPacketSize, + thriftBuffer: thriftBuffer, + }, nil +} + +// NewAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP. +func NewAgentClientUDP(hostPort string, maxPacketSize int) (*AgentClientUDP, error) { + return NewAgentClientUDPWithParams(AgentClientUDPParams{ + HostPort: hostPort, + MaxPacketSize: maxPacketSize, + }) } // EmitZipkinBatch implements EmitZipkinBatch() of Agent interface diff --git a/utils/udp_client_test.go b/utils/udp_client_test.go new file mode 100644 index 00000000..9adcfef5 --- /dev/null +++ b/utils/udp_client_test.go @@ -0,0 +1,118 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "net" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/jaeger-client-go/log" + "github.com/uber/jaeger-client-go/thrift-gen/zipkincore" +) + +func TestNewAgentClientUDPWithParamsBadHostport(t *testing.T) { + hostPort := "blahblah" + + agentClient, err := NewAgentClientUDPWithParams(AgentClientUDPParams{ + HostPort: hostPort, + }) + + assert.Error(t, err) + assert.Nil(t, agentClient) +} + +func TestNewAgentClientUDPWithParams(t *testing.T) { + mockServer, err := newUDPListener() + require.NoError(t, err) + defer mockServer.Close() + + agentClient, err := NewAgentClientUDPWithParams(AgentClientUDPParams{ + HostPort: mockServer.LocalAddr().String(), + MaxPacketSize: 25000, + Logger: log.NullLogger, + }) + assert.NoError(t, err) + assert.NotNil(t, agentClient) + assert.Equal(t, 25000, agentClient.maxPacketSize) + + if assert.IsType(t, &reconnectingUDPConn{}, agentClient.connUDP) { + assert.Equal(t, log.NullLogger, agentClient.connUDP.(*reconnectingUDPConn).logger) + } + + assert.NoError(t, agentClient.Close()) +} + +func TestNewAgentClientUDPWithParamsDefaults(t *testing.T) { + mockServer, err := newUDPListenerOnPort(6831) + require.NoError(t, err) + defer mockServer.Close() + + agentClient, err := NewAgentClientUDPWithParams(AgentClientUDPParams{ + HostPort: "localhost:6831", + }) + assert.NoError(t, err) + assert.NotNil(t, agentClient) + assert.Equal(t, UDPPacketMaxLength, agentClient.maxPacketSize) + + if assert.IsType(t, &reconnectingUDPConn{}, agentClient.connUDP) { + assert.Equal(t, agentClient.connUDP.(*reconnectingUDPConn).logger, log.StdLogger) + } + + assert.NoError(t, agentClient.Close()) +} + +func TestNewAgentClientUDPDefaults(t *testing.T) { + mockServer, err := newUDPListenerOnPort(6831) + require.NoError(t, err) + defer mockServer.Close() + + agentClient, err := NewAgentClientUDP("localhost:6831", 0) + assert.NoError(t, err) + assert.NotNil(t, agentClient) + assert.Equal(t, UDPPacketMaxLength, agentClient.maxPacketSize) + + if assert.IsType(t, &reconnectingUDPConn{}, agentClient.connUDP) { + assert.Equal(t, agentClient.connUDP.(*reconnectingUDPConn).logger, log.StdLogger) + } + + assert.NoError(t, agentClient.Close()) +} + +func TestNewAgentClientUDPWithParamsReconnectingDisabled(t *testing.T) { + mockServer, err := newUDPListener() + require.NoError(t, err) + defer mockServer.Close() + + agentClient, err := NewAgentClientUDPWithParams(AgentClientUDPParams{ + HostPort: mockServer.LocalAddr().String(), + Logger: log.NullLogger, + DisableAttemptReconnecting: true, + }) + assert.NoError(t, err) + assert.NotNil(t, agentClient) + assert.Equal(t, UDPPacketMaxLength, agentClient.maxPacketSize) + + assert.IsType(t, &net.UDPConn{}, agentClient.connUDP) + + assert.NoError(t, agentClient.Close()) +} + +func TestAgentClientUDPNotSupportZipkin(t *testing.T) { + agentClient := AgentClientUDP{} + + assert.Error(t, agentClient.EmitZipkinBatch([]*zipkincore.Span{{Name: "fakespan"}})) +}