Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor agent configuration #1092

Merged
merged 11 commits into from
Oct 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/agent/app/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

jmetrics "github.com/jaegertracing/jaeger/pkg/metrics"
Expand All @@ -33,7 +34,7 @@ import (

func TestAgentStartError(t *testing.T) {
cfg := &Builder{}
agent, err := cfg.CreateAgent(zap.NewNop())
agent, err := cfg.CreateAgent(fakeCollectorProxy{}, zap.NewNop(), metrics.NullFactory)
require.NoError(t, err)
agent.httpServer.Addr = "bad-address"
assert.Error(t, agent.Run())
Expand Down Expand Up @@ -100,7 +101,8 @@ func withRunningAgent(t *testing.T, testcase func(string, chan error)) {
},
}
logger, logBuf := testutils.NewLogger()
agent, err := cfg.CreateAgent(logger)
f, _ := cfg.Metrics.CreateMetricsFactory("jaeger")
agent, err := cfg.CreateAgent(fakeCollectorProxy{}, logger, f)
require.NoError(t, err)
ch := make(chan error, 2)
go func() {
Expand Down
92 changes: 33 additions & 59 deletions cmd/agent/app/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,15 @@ package app
import (
"fmt"
"net/http"
"time"

"github.com/apache/thrift/lib/go/thrift"
"github.com/pkg/errors"
"github.com/uber/jaeger-lib/metrics"
"github.com/uber/tchannel-go"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/agent/app/httpserver"
"github.com/jaegertracing/jaeger/cmd/agent/app/processors"
"github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
tchreporter "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel"
"github.com/jaegertracing/jaeger/cmd/agent/app/servers"
"github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp"
jmetrics "github.com/jaegertracing/jaeger/pkg/metrics"
Expand All @@ -37,11 +34,9 @@ import (
)

const (
defaultQueueSize = 1000
defaultMaxPacketSize = 65000
defaultServerWorkers = 10
defaultMinPeers = 3
defaultConnCheckTimeout = 250 * time.Millisecond
defaultQueueSize = 1000
defaultMaxPacketSize = 65000
defaultServerWorkers = 10

defaultHTTPServerHostPort = ":5778"

Expand All @@ -67,16 +62,19 @@ var (
}
)

// CollectorProxy provides access to Reporter and ClientConfigManager
type CollectorProxy interface {
GetReporter() reporter.Reporter
GetManager() httpserver.ClientConfigManager
}

// Builder Struct to hold configurations
type Builder struct {
Processors []ProcessorConfiguration `yaml:"processors"`
HTTPServer HTTPServerConfiguration `yaml:"httpServer"`
Metrics jmetrics.Builder `yaml:"metrics"`

tchreporter.Builder `yaml:",inline"`

otherReporters []reporter.Reporter
metricsFactory metrics.Factory
reporters []reporter.Reporter
}

// ProcessorConfiguration holds config for a processor that receives spans from Server
Expand All @@ -100,62 +98,35 @@ type HTTPServerConfiguration struct {
}

// WithReporter adds auxiliary reporters.
func (b *Builder) WithReporter(r reporter.Reporter) *Builder {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
b.otherReporters = append(b.otherReporters, r)
return b
}

// WithMetricsFactory sets an externally initialized metrics factory.
func (b *Builder) WithMetricsFactory(mf metrics.Factory) *Builder {
b.metricsFactory = mf
func (b *Builder) WithReporter(r ...reporter.Reporter) *Builder {
b.reporters = append(b.reporters, r...)
return b
}

func (b *Builder) createMainReporter(mFactory metrics.Factory, logger *zap.Logger) (*tchreporter.Reporter, error) {
return b.CreateReporter(mFactory, logger)
}

func (b *Builder) getMetricsFactory() (metrics.Factory, error) {
if b.metricsFactory != nil {
return b.metricsFactory, nil
}

baseFactory, err := b.Metrics.CreateMetricsFactory("jaeger")
// CreateAgent creates the Agent
func (b *Builder) CreateAgent(primaryProxy CollectorProxy, logger *zap.Logger, mFactory metrics.Factory) (*Agent, error) {
r := b.getReporter(primaryProxy)
processors, err := b.getProcessors(r, mFactory, logger)
if err != nil {
return nil, err
}

return baseFactory.Namespace("agent", nil), nil
server := b.HTTPServer.getHTTPServer(primaryProxy.GetManager(), mFactory, &b.Metrics)
return NewAgent(processors, server, logger), nil
}

// CreateAgent creates the Agent
func (b *Builder) CreateAgent(logger *zap.Logger) (*Agent, error) {
mFactory, err := b.getMetricsFactory()
if err != nil {
return nil, errors.Wrap(err, "cannot create metrics factory")
}
mainReporter, err := b.createMainReporter(mFactory, logger)
if err != nil {
return nil, errors.Wrap(err, "cannot create main Reporter")
func (b *Builder) getReporter(primaryProxy CollectorProxy) reporter.Reporter {
if len(b.reporters) == 0 {
return primaryProxy.GetReporter()
}
var rep reporter.Reporter = mainReporter
if len(b.otherReporters) > 0 {
reps := append([]reporter.Reporter{mainReporter}, b.otherReporters...)
rep = reporter.NewMultiReporter(reps...)
rep := make([]reporter.Reporter, len(b.reporters)+1)
rep[0] = primaryProxy.GetReporter()
for i, r := range b.reporters {
rep[i+1] = r
}
processors, err := b.GetProcessors(rep, mFactory, logger)
if err != nil {
return nil, err
}
httpServer := b.HTTPServer.GetHTTPServer(b.CollectorServiceName, mainReporter.Channel(), mFactory)
if h := b.Metrics.Handler(); mFactory != nil && h != nil {
httpServer.Handler.(*http.ServeMux).Handle(b.Metrics.HTTPRoute, h)
}
return NewAgent(processors, httpServer, logger), nil
return reporter.NewMultiReporter(rep...)
}

// GetProcessors creates Processors with attached Reporter
func (b *Builder) GetProcessors(rep reporter.Reporter, mFactory metrics.Factory, logger *zap.Logger) ([]processors.Processor, error) {
func (b *Builder) getProcessors(rep reporter.Reporter, mFactory metrics.Factory, logger *zap.Logger) ([]processors.Processor, error) {
retMe := make([]processors.Processor, len(b.Processors))
for idx, cfg := range b.Processors {
protoFactory, ok := protocolFactoryMap[cfg.Protocol]
Expand Down Expand Up @@ -185,12 +156,15 @@ func (b *Builder) GetProcessors(rep reporter.Reporter, mFactory metrics.Factory,
}

// GetHTTPServer creates an HTTP server that provides sampling strategies and baggage restrictions to client libraries.
func (c HTTPServerConfiguration) GetHTTPServer(svc string, channel *tchannel.Channel, mFactory metrics.Factory) *http.Server {
mgr := httpserver.NewCollectorProxy(svc, channel, mFactory)
func (c HTTPServerConfiguration) getHTTPServer(manager httpserver.ClientConfigManager, mFactory metrics.Factory, mBuilder *jmetrics.Builder) *http.Server {
if c.HostPort == "" {
c.HostPort = defaultHTTPServerHostPort
}
return httpserver.NewHTTPServer(c.HostPort, mgr, mFactory)
server := httpserver.NewHTTPServer(c.HostPort, manager, mFactory)
if h := mBuilder.Handler(); mFactory != nil && h != nil {
server.Handler.(*http.ServeMux).Handle(mBuilder.HTTPRoute, h)
}
return server
}

// GetThriftProcessor gets a TBufferedServer backed Processor using the collector configuration
Expand Down
91 changes: 37 additions & 54 deletions cmd/agent/app/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package app

import (
"errors"
"fmt"
"strings"
"testing"

Expand All @@ -25,7 +26,11 @@ import (
"go.uber.org/zap"
"gopkg.in/yaml.v2"

"github.com/jaegertracing/jaeger/cmd/agent/app/httpserver"
"github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

Expand All @@ -51,14 +56,6 @@ processors:

httpServer:
hostPort: 4.4.4.4:5778

collectorHostPorts:
- 127.0.0.1:14267
- 127.0.0.1:14268
- 127.0.0.1:14269

collectorServiceName: some-collector-service
minPeers: 4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to confirm - this is no longer supported? I thought there was a fallback with the options that might make the old config still work.

Copy link
Member Author

@pavolloffay pavolloffay Oct 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

old flags are supported and deprecated https://github.com/jaegertracing/jaeger/pull/1092/files#diff-1fc7689d387f4303d6e478ca01841286R29. The new flags take precedence over old ones.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jaeger agent is a daemon program that runs on every host and receives tracing data submitted by Jaeger client libraries.

Usage:
  jaeger-agent [flags]
  jaeger-agent [command]

Available Commands:
  help        Help about any command
  version     Print the version

Flags:
      --collector.host-port string                                Deprecated; comma-separated string representing host:ports of a static list of collectors to connect to directly (e.g. when not using service discovery)
      --config-file string                                        Configuration file in JSON, TOML, YAML, HCL, or Java properties formats (default none). See spf13/viper for precedence.
      --discovery.conn-check-timeout duration                     Deprecated; sets the timeout used when establishing new connections (default 250ms)
      --discovery.min-peers int                                   Deprecated; if using service discovery, the min number of connections to maintain to the backend (default 3)
  -h, --help                                                      help for jaeger-agent
      --http-server.host-port string                              host:port of the http server (e.g. for /sampling point and /baggage endpoint) (default ":5778")
      --log-level string                                          Minimal allowed log Level. For more levels see https://github.com/uber-go/zap (default "info")
      --metrics-backend string                                    Defines which metrics backend to use for metrics reporting: expvar, prometheus, none (default "prometheus")
      --metrics-http-route string                                 Defines the route of HTTP endpoint for metrics backends that support scraping (default "/metrics")
      --processor.jaeger-binary.server-host-port string           host:port for the UDP server (default ":6832")
      --processor.jaeger-binary.server-max-packet-size int        max packet size for the UDP server (default 65000)
      --processor.jaeger-binary.server-queue-size int             length of the queue for the UDP server (default 1000)
      --processor.jaeger-binary.workers int                       how many workers the processor should run (default 10)
      --processor.jaeger-compact.server-host-port string          host:port for the UDP server (default ":6831")
      --processor.jaeger-compact.server-max-packet-size int       max packet size for the UDP server (default 65000)
      --processor.jaeger-compact.server-queue-size int            length of the queue for the UDP server (default 1000)
      --processor.jaeger-compact.workers int                      how many workers the processor should run (default 10)
      --processor.zipkin-compact.server-host-port string          host:port for the UDP server (default ":5775")
      --processor.zipkin-compact.server-max-packet-size int       max packet size for the UDP server (default 65000)
      --processor.zipkin-compact.server-queue-size int            length of the queue for the UDP server (default 1000)
      --processor.zipkin-compact.workers int                      how many workers the processor should run (default 10)
      --reporter.tchannel.collector.host-port string              comma-separated string representing host:ports of a static list of collectors to connect to directly (e.g. when not using service discovery)
      --reporter.tchannel.discovery.conn-check-timeout duration   sets the timeout used when establishing new connections (default 250ms)
      --reporter.tchannel.discovery.min-peers int                 if using service discovery, the min number of connections to maintain to the backend (default 3)

Use "jaeger-agent [command] --help" for more information about a command.

`

func TestBuilderFromConfig(t *testing.T) {
Expand Down Expand Up @@ -101,59 +98,24 @@ func TestBuilderFromConfig(t *testing.T) {
},
}, cfg.Processors[2])
assert.Equal(t, "4.4.4.4:5778", cfg.HTTPServer.HostPort)

assert.Equal(t, 4, cfg.DiscoveryMinPeers)
assert.Equal(t, "some-collector-service", cfg.CollectorServiceName)
assert.Equal(
t,
[]string{"127.0.0.1:14267", "127.0.0.1:14268", "127.0.0.1:14269"},
cfg.CollectorHostPorts)
}

func TestBuilderWithExtraReporter(t *testing.T) {
cfg := &Builder{}
cfg.WithReporter(fakeReporter{})
agent, err := cfg.CreateAgent(zap.NewNop())
agent, err := cfg.CreateAgent(fakeCollectorProxy{}, zap.NewNop(), metrics.NullFactory)
assert.NoError(t, err)
assert.NotNil(t, agent)
}

func TestBuilderMetrics(t *testing.T) {
mf := metrics.NullFactory
b := new(Builder).WithMetricsFactory(mf)
mf2, err := b.getMetricsFactory()
assert.NoError(t, err)
assert.Equal(t, mf, mf2)
}

func TestBuilderMetricsHandler(t *testing.T) {
b := &Builder{}
b.Metrics.Backend = "expvar"
b.Metrics.HTTPRoute = "/expvar"
factory, err := b.Metrics.CreateMetricsFactory("test")
assert.NoError(t, err)
assert.NotNil(t, factory)
b.metricsFactory = factory
agent, err := b.CreateAgent(zap.NewNop())
agent, err := b.CreateAgent(fakeCollectorProxy{}, zap.NewNop(), metrics.NullFactory)
assert.NoError(t, err)
assert.NotNil(t, agent)
}

func TestBuilderMetricsError(t *testing.T) {
b := &Builder{}
b.Metrics.Backend = "invalid"
_, err := b.CreateAgent(zap.NewNop())
assert.EqualError(t, err, "cannot create metrics factory: unknown metrics backend specified")
}

func TestBuilderWithDiscoveryError(t *testing.T) {
cfg := &Builder{}
cfg.WithDiscoverer(fakeDiscoverer{})
agent, err := cfg.CreateAgent(zap.NewNop())
assert.EqualError(t, err, "cannot create main Reporter: cannot enable service discovery: both discovery.Discoverer and discovery.Notifier must be specified")
assert.Nil(t, agent)
}

func TestBuilderWithProcessorErrors(t *testing.T) {
testCases := []struct {
model Model
Expand All @@ -180,7 +142,7 @@ func TestBuilderWithProcessorErrors(t *testing.T) {
},
},
}
_, err := cfg.CreateAgent(zap.NewNop())
_, err := cfg.CreateAgent(&fakeCollectorProxy{}, zap.NewNop(), metrics.NullFactory)
assert.Error(t, err)
if testCase.err != "" {
assert.EqualError(t, err, testCase.err)
Expand All @@ -190,18 +152,39 @@ func TestBuilderWithProcessorErrors(t *testing.T) {
}
}

type fakeReporter struct{}
func TestMultipleCollectorProxies(t *testing.T) {
b := Builder{}
ra := fakeCollectorProxy{}
rb := fakeCollectorProxy{}
b.WithReporter(ra)
r := b.getReporter(rb)
mr, ok := r.(reporter.MultiReporter)
require.True(t, ok)
fmt.Println(mr)
assert.Equal(t, rb, mr[0])
assert.Equal(t, ra, mr[1])
}

func (fr fakeReporter) EmitZipkinBatch(spans []*zipkincore.Span) (err error) {
return nil
type fakeCollectorProxy struct {
}

func (fr fakeReporter) EmitBatch(batch *jaeger.Batch) (err error) {
return nil
func (f fakeCollectorProxy) GetReporter() reporter.Reporter {
return fakeCollectorProxy{}
}
func (f fakeCollectorProxy) GetManager() httpserver.ClientConfigManager {
return fakeCollectorProxy{}
}

type fakeDiscoverer struct{}
func (fakeCollectorProxy) EmitZipkinBatch(spans []*zipkincore.Span) (err error) {
return nil
}
func (fakeCollectorProxy) EmitBatch(batch *jaeger.Batch) (err error) {
return nil
}

func (fd fakeDiscoverer) Instances() ([]string, error) {
return nil, errors.New("discoverer error")
func (f fakeCollectorProxy) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
return nil, errors.New("no peers available")
}
func (fakeCollectorProxy) GetBaggageRestrictions(serviceName string) ([]*baggage.BaggageRestriction, error) {
return nil, nil
}
21 changes: 0 additions & 21 deletions cmd/agent/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package app
import (
"flag"
"fmt"
"strings"

"github.com/spf13/viper"
)
Expand All @@ -27,10 +26,7 @@ const (
suffixServerQueueSize = "server-queue-size"
suffixServerMaxPacketSize = "server-max-packet-size"
suffixServerHostPort = "server-host-port"
collectorHostPort = "collector.host-port"
httpServerHostPort = "http-server.host-port"
discoveryMinPeers = "discovery.min-peers"
discoveryConnCheckTimeout = "discovery.conn-check-timeout"
)

var defaultProcessors = []struct {
Expand All @@ -52,22 +48,10 @@ func AddFlags(flags *flag.FlagSet) {
flags.Int(prefix+suffixServerMaxPacketSize, defaultMaxPacketSize, "max packet size for the UDP server")
flags.String(prefix+suffixServerHostPort, processor.hostPort, "host:port for the UDP server")
}
flags.String(
collectorHostPort,
"",
"comma-separated string representing host:ports of a static list of collectors to connect to directly (e.g. when not using service discovery)")
flags.String(
httpServerHostPort,
defaultHTTPServerHostPort,
"host:port of the http server (e.g. for /sampling point and /baggage endpoint)")
flags.Int(
discoveryMinPeers,
defaultMinPeers,
"if using service discovery, the min number of connections to maintain to the backend")
flags.Duration(
discoveryConnCheckTimeout,
defaultConnCheckTimeout,
"sets the timeout used when establishing new connections")
}

// InitFromViper initializes Builder with properties retrieved from Viper.
Expand All @@ -84,11 +68,6 @@ func (b *Builder) InitFromViper(v *viper.Viper) *Builder {
b.Processors = append(b.Processors, *p)
}

if len(v.GetString(collectorHostPort)) > 0 {
b.CollectorHostPorts = strings.Split(v.GetString(collectorHostPort), ",")
}
b.HTTPServer.HostPort = v.GetString(httpServerHostPort)
b.DiscoveryMinPeers = v.GetInt(discoveryMinPeers)
b.ConnCheckTimeout = v.GetDuration(discoveryConnCheckTimeout)
return b
}
Loading