Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

Add resolved udp connection type, continually resolve dns names in background #520

Merged
merged 25 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7f1085d
Add resolved udp connection type, continually resolve dns names in
terev Jun 24, 2020
3690bcb
Be sure to set buffer bytes width on new connections.
terev Jun 26, 2020
e7e7b32
Lock when checking if resolved addr is new.
terev Jun 26, 2020
7994cdf
Fixes from review comments. Dont return error if UDPConn fails on sta…
terev Jun 29, 2020
66b7ec4
Fix failing test. Apparently the linux kernel returns the sockopt val
terev Jun 29, 2020
fc0d3eb
Use atomic ops to manage bufferBytes instead of locking mutex
terev Jun 29, 2020
c47cc4b
Fix buffer bytes assert because sock opt value is not guaranteed to b…
terev Jun 29, 2020
aaf0c9b
Remove intermediate init helpers, initialize close chann in struct
terev Jun 30, 2020
0264dd8
Fixes based on comments, more tests for udp_client.go, and test for
terev Jul 3, 2020
8ea3b31
Run make fmt
terev Jul 3, 2020
d4f29aa
Remove unused struct field
terev Jul 3, 2020
fd62566
Fix lint error
terev Jul 3, 2020
d70dc93
Add test for new conn established when host record changes
terev Jul 3, 2020
4161d17
Fix comment typo
terev Jul 3, 2020
0d18d0d
Add test for failed write retry
terev Jul 3, 2020
044e26a
Add test calling NewAgentClientUDP
terev Jul 3, 2020
9c36339
Remove sleep on last try evaluating connection condition
terev Jul 3, 2020
70bff5e
Rename resolved udp conn to reconnecting udp conn, add opt-out option
terev Jul 7, 2020
3aedb36
Remove irrelevant comment, fix transport max packet size regression
terev Jul 7, 2020
f9de33b
Add panic in case the test server listen or srv fails unexpectedly
terev Jul 7, 2020
19fb557
Add coverage for new env vars
terev Jul 7, 2020
3003b3c
Run make fmt
terev Jul 7, 2020
3e254f0
Add back constants, add helper for generating a mock udp addr, require
terev Jul 8, 2020
2e4ea48
Remove local agent constants from utils
terev Jul 8, 2020
da03468
Move no error requirement into mock udp addr constructor
terev Jul 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 74 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (rc *ReporterConfig) NewReporter(
metrics *jaeger.Metrics,
logger jaeger.Logger,
) (jaeger.Reporter, error) {
sender, err := rc.newTransport()
sender, err := rc.newTransport(logger)
if err != nil {
return nil, err
}
Expand All @@ -401,7 +401,7 @@ func (rc *ReporterConfig) NewReporter(
return reporter, err
}

func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) {
func (rc *ReporterConfig) newTransport(logger jaeger.Logger) (jaeger.Transport, error) {
switch {
case rc.CollectorEndpoint != "":
httpOptions := []transport.HTTPOption{transport.HTTPBatchSize(1), transport.HTTPHeaders(rc.HTTPHeaders)}
Expand All @@ -410,6 +410,9 @@ func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) {
}
return transport.NewHTTPTransport(rc.CollectorEndpoint, httpOptions...), nil
default:
return jaeger.NewUDPTransport(rc.LocalAgentHostPort, 0)
return jaeger.NewUDPTransportWithParams(jaeger.UDPTransportParams{
HostPort: rc.LocalAgentHostPort,
Logger: logger,
})
}
}
6 changes: 3 additions & 3 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,15 +561,15 @@ func TestInvalidSamplerType(t *testing.T) {
func TestUDPTransportType(t *testing.T) {
rc := &ReporterConfig{LocalAgentHostPort: "localhost:1234"}
expect, _ := jaeger.NewUDPTransport(rc.LocalAgentHostPort, 0)
sender, err := rc.newTransport()
sender, err := rc.newTransport(log.NullLogger)
require.NoError(t, err)
require.IsType(t, expect, sender)
}

func TestHTTPTransportType(t *testing.T) {
rc := &ReporterConfig{CollectorEndpoint: "http://1.2.3.4:5678/api/traces"}
expect := transport.NewHTTPTransport(rc.CollectorEndpoint)
sender, err := rc.newTransport()
sender, err := rc.newTransport(log.NullLogger)
require.NoError(t, err)
require.IsType(t, expect, sender)
}
Expand All @@ -581,7 +581,7 @@ func TestHTTPTransportTypeWithAuth(t *testing.T) {
Password: "auth_pass",
}
expect := transport.NewHTTPTransport(rc.CollectorEndpoint)
sender, err := rc.newTransport()
sender, err := rc.newTransport(log.NullLogger)
require.NoError(t, err)
require.IsType(t, expect, sender)
}
Expand Down
45 changes: 36 additions & 9 deletions transport_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"github.com/uber/jaeger-client-go/internal/reporterstats"
"github.com/uber/jaeger-client-go/log"
"github.com/uber/jaeger-client-go/thrift"
j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
"github.com/uber/jaeger-client-go/utils"
Expand Down Expand Up @@ -57,35 +58,61 @@ type udpSender struct {
failedToEmitSpans int64
}

// NewUDPTransport creates a reporter that submits spans to jaeger-agent.
// UDPTransportParams allows specifying options for initializing a UDPTransport. An instance of this struct should
// be passed to NewUDPTransportWithParams.
type UDPTransportParams struct {
HostPort string
MaxPacketSize int
Logger log.Logger
}

// NewUDPTransportWithParams creates a reporter that submits spans to jaeger-agent.
// TODO: (breaking change) move to transport/ package.
func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
if len(hostPort) == 0 {
hostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort)
func NewUDPTransportWithParams(params UDPTransportParams) (Transport, error) {
if len(params.HostPort) == 0 {
params.HostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort)
}
if maxPacketSize == 0 {
maxPacketSize = utils.UDPPacketMaxLength

if params.MaxPacketSize == 0 {
params.MaxPacketSize = utils.UDPPacketMaxLength
}
terev marked this conversation as resolved.
Show resolved Hide resolved

if params.Logger == nil {
params.Logger = log.StdLogger
}

protocolFactory := thrift.NewTCompactProtocolFactory()

// Each span is first written to thriftBuffer to determine its size in bytes.
thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize)
thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize)
thriftProtocol := protocolFactory.GetProtocol(thriftBuffer)

client, err := utils.NewAgentClientUDP(hostPort, maxPacketSize)
client, err := utils.NewAgentClientUDPWithParams(utils.AgentClientUDPParams{
HostPort: params.HostPort,
MaxPacketSize: params.MaxPacketSize,
Logger: params.Logger,
})
if err != nil {
return nil, err
}

return &udpSender{
client: client,
maxSpanBytes: maxPacketSize - emitBatchOverhead,
maxSpanBytes: params.MaxPacketSize - emitBatchOverhead,
thriftBuffer: thriftBuffer,
thriftProtocol: thriftProtocol,
}, nil
}

// NewUDPTransport creates a reporter that submits spans to jaeger-agent.
// TODO: (breaking change) move to transport/ package.
func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
return NewUDPTransportWithParams(UDPTransportParams{
HostPort: hostPort,
MaxPacketSize: maxPacketSize,
})
}

// SetReporterStats implements reporterstats.Receiver.
func (s *udpSender) SetReporterStats(rs reporterstats.ReporterStats) {
s.reporterStats = rs
Expand Down
Loading