diff --git a/cmd/agent/app/reporter/flags.go b/cmd/agent/app/reporter/flags.go index b9fbb4877dd..c10dbed3cc9 100644 --- a/cmd/agent/app/reporter/flags.go +++ b/cmd/agent/app/reporter/flags.go @@ -17,18 +17,21 @@ package reporter import ( "flag" "fmt" + "os" + "strings" "github.com/spf13/viper" ) const ( + // Whether to use grpc or tchannel reporter. reporterType = "reporter.type" + // Agent tags + agentTags = "jaeger.tags" // TCHANNEL is name of tchannel reporter. TCHANNEL Type = "tchannel" // GRPC is name of gRPC reporter. GRPC Type = "grpc" - // Agent tags - agentTags = "jaeger.tags" ) // Type defines type of reporter. @@ -37,7 +40,7 @@ type Type string // Options holds generic reporter configuration. type Options struct { ReporterType Type - AgentTags Type + AgentTags map[Type]Type } // AddFlags adds flags for Options. @@ -49,6 +52,29 @@ func AddFlags(flags *flag.FlagSet) { // InitFromViper initializes Options with properties retrieved from Viper. func (b *Options) InitFromViper(v *viper.Viper) *Options { b.ReporterType = Type(v.GetString(reporterType)) - b.AgentTags = Type(v.GetString(agentTags)) + b.AgentTags = parseAgentTags(Type(v.GetString(agentTags))) return b } + +// Parsing logic borrowed from jaegertracing/jaeger-client-go +func parseAgentTags(agentTags Type) map[Type]Type { + tagPairs := strings.Split(string(agentTags), ",") + var tags map[Type]Type + for _, p := range tagPairs { + kv := strings.SplitN(p, "=", 2) + k, v := Type(strings.TrimSpace(kv[0])), Type(strings.TrimSpace(kv[1])) + + if strings.HasPrefix(string(v), "${") && strings.HasSuffix(string(v), "}") { + ed := strings.SplitN(string(v[2:len(v)-1]), ":", 2) + e, d := ed[0], ed[1] + v = Type(os.Getenv(e)) + if v == "" && d != "" { + v = Type(d) + } + } + + tags[k] = v + } + + return tags +} \ No newline at end of file diff --git a/cmd/agent/app/reporter/flags_test.go b/cmd/agent/app/reporter/flags_test.go index 354a240327a..5b21d83b8d5 100644 --- a/cmd/agent/app/reporter/flags_test.go +++ b/cmd/agent/app/reporter/flags_test.go @@ -34,10 +34,12 @@ func TestBindFlags(t *testing.T) { err := command.ParseFlags([]string{ "--reporter.type=grpc", + "--jaeger.tag key=value", }) require.NoError(t, err) b := &Options{} b.InitFromViper(v) assert.Equal(t, Type("grpc"), b.ReporterType) -} + assert.Equal(t, parseAgentTags(Type("key=value")), b.AgentTags) +} \ No newline at end of file diff --git a/cmd/agent/app/reporter/grpc/collector_proxy.go b/cmd/agent/app/reporter/grpc/collector_proxy.go index 30ff15a4799..06fbdd4a18f 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy.go @@ -42,7 +42,7 @@ type ProxyBuilder struct { var systemCertPool = x509.SystemCertPool // to allow overriding in unit test // NewCollectorProxy creates ProxyBuilder -func NewCollectorProxy(o *Options, agentTags string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) { +func NewCollectorProxy(o *Options, agentTags map[string]string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) { if len(o.CollectorHostPort) == 0 { return nil, errors.New("could not create collector proxy, address is missing") } diff --git a/cmd/agent/app/reporter/grpc/reporter.go b/cmd/agent/app/reporter/grpc/reporter.go index 9abfc9ad36a..e3c2e0aa44b 100644 --- a/cmd/agent/app/reporter/grpc/reporter.go +++ b/cmd/agent/app/reporter/grpc/reporter.go @@ -38,18 +38,18 @@ type Reporter struct { } // NewReporter creates gRPC reporter. -func NewReporter(conn *grpc.ClientConn, agentTags string, logger *zap.Logger) *Reporter { +func NewReporter(conn *grpc.ClientConn, agentTags map[string]string, logger *zap.Logger) *Reporter { return &Reporter{ collector: api_v2.NewCollectorServiceClient(conn), logger: logger, sanitizer: zipkin2.NewChainedSanitizer(zipkin2.StandardSanitizers...), - agentTags: parseAgentTags(agentTags), + agentTags: makeModelKeyValue(agentTags), } } // EmitBatch implements EmitBatch() of Reporter func (r *Reporter) EmitBatch(b *thrift.Batch) error { - return r.send(addAgentTags(jConverter.ToDomain(b.Spans, nil), r.agentTags), jConverter.ToDomainProcess(b.Process)) + return r.send(jConverter.ToDomain(b.Spans, nil), jConverter.ToDomainProcess(b.Process)) } // EmitZipkinBatch implements EmitZipkinBatch() of Reporter @@ -61,10 +61,11 @@ func (r *Reporter) EmitZipkinBatch(zSpans []*zipkincore.Span) error { if err != nil { return err } - return r.send(addAgentTags(trace.Spans, r.agentTags), nil) + return r.send(trace.Spans, nil) } func (r *Reporter) send(spans []*model.Span, process *model.Process) error { + spans = addAgentTags(spans, r.agentTags) batch := model.Batch{Spans: spans, Process: process} req := &api_v2.PostSpansRequest{Batch: batch} _, err := r.collector.PostSpans(context.Background(), req) @@ -84,23 +85,9 @@ func addAgentTags(spans []*model.Span, agentTags []model.KeyValue) []*model.Span return spans } -// Parsing logic borrowed from jaegertracing/jaeger-client-go -func parseAgentTags(agentTags string) []model.KeyValue { - tagPairs := strings.Split(agentTags, ",") +func makeModelKeyValue(agentTags map[string]string) []model.KeyValue { tags := make([]model.KeyValue, 0) - for _, p := range tagPairs { - kv := strings.SplitN(p, "=", 2) - k, v := strings.TrimSpace(kv[0]), strings.TrimSpace(kv[1]) - - if strings.HasPrefix(v, "${") && strings.HasSuffix(v, "}") { - ed := strings.SplitN(v[2:len(v)-1], ":", 2) - e, d := ed[0], ed[1] - v = os.Getenv(e) - if v == "" && d != "" { - v = d - } - } - + for k, v := range agentTags { tag := model.KeyValue{ Key: k, VStr: v, diff --git a/cmd/agent/app/reporter/tchannel/builder.go b/cmd/agent/app/reporter/tchannel/builder.go index d4254e82f0d..632c8cc9228 100644 --- a/cmd/agent/app/reporter/tchannel/builder.go +++ b/cmd/agent/app/reporter/tchannel/builder.go @@ -106,7 +106,7 @@ func (b *Builder) enableDiscovery(channel *tchannel.Channel, logger *zap.Logger) } // CreateReporter creates the TChannel-based Reporter -func (b *Builder) CreateReporter(agentTags string, logger *zap.Logger) (*Reporter, error) { +func (b *Builder) CreateReporter(agentTags map[string]string, logger *zap.Logger) (*Reporter, error) { if b.channel == nil { // ignore errors since it only happens on empty service name b.channel, _ = tchannel.NewChannel(agentServiceName, nil) diff --git a/cmd/agent/app/reporter/tchannel/builder_test.go b/cmd/agent/app/reporter/tchannel/builder_test.go index 52ed318f0d7..7aa095c39e0 100644 --- a/cmd/agent/app/reporter/tchannel/builder_test.go +++ b/cmd/agent/app/reporter/tchannel/builder_test.go @@ -49,7 +49,7 @@ func TestBuilderFromConfig(t *testing.T) { t, []string{"127.0.0.1:14267", "127.0.0.1:14268", "127.0.0.1:14269"}, cfg.CollectorHostPorts) - r, err := cfg.CreateReporter("", zap.NewNop()) + r, err := cfg.CreateReporter(nil, zap.NewNop()) require.NoError(t, err) assert.NotNil(t, r) assert.Equal(t, "some-collector-service", r.CollectorServiceName()) @@ -60,20 +60,20 @@ func TestBuilderWithDiscovery(t *testing.T) { cfg := &Builder{} discoverer := discovery.FixedDiscoverer([]string{"1.1.1.1:80"}) cfg.WithDiscoverer(discoverer) - _, err := cfg.CreateReporter("", zap.NewNop()) + _, err := cfg.CreateReporter(nil, zap.NewNop()) assert.EqualError(t, err, "cannot enable service discovery: both discovery.Discoverer and discovery.Notifier must be specified") cfg = &Builder{} notifier := &discovery.Dispatcher{} cfg.WithDiscoverer(discoverer).WithDiscoveryNotifier(notifier) - agent, err := cfg.CreateReporter("", zap.NewNop()) + agent, err := cfg.CreateReporter(nil, zap.NewNop()) assert.NoError(t, err) assert.NotNil(t, agent) } func TestBuilderWithDiscoveryError(t *testing.T) { tbuilder := NewBuilder().WithDiscoverer(fakeDiscoverer{}) - rep, err := tbuilder.CreateReporter("", zap.NewNop()) + rep, err := tbuilder.CreateReporter(nil, zap.NewNop()) assert.EqualError(t, err, "cannot enable service discovery: both discovery.Discoverer and discovery.Notifier must be specified") assert.Nil(t, rep) } @@ -81,13 +81,13 @@ func TestBuilderWithDiscoveryError(t *testing.T) { func TestBuilderWithCollectorServiceName(t *testing.T) { cfg := &Builder{} cfg.WithCollectorServiceName("svc") - agent, err := cfg.CreateReporter("", zap.NewNop()) + agent, err := cfg.CreateReporter(nil, zap.NewNop()) assert.NoError(t, err) assert.NotNil(t, agent) assert.Equal(t, cfg.CollectorServiceName, "svc") cfg = NewBuilder() - agent, err = cfg.CreateReporter("", zap.NewNop()) + agent, err = cfg.CreateReporter(nil, zap.NewNop()) assert.NoError(t, err) assert.NotNil(t, agent) assert.Equal(t, cfg.CollectorServiceName, "jaeger-collector") @@ -97,7 +97,7 @@ func TestBuilderWithChannel(t *testing.T) { cfg := &Builder{} channel, _ := tchannel.NewChannel(agentServiceName, nil) cfg.WithChannel(channel) - rep, err := cfg.CreateReporter("", zap.NewNop()) + rep, err := cfg.CreateReporter(nil, zap.NewNop()) assert.NoError(t, err) assert.NotNil(t, rep.Channel()) assert.Equal(t, defaultCollectorServiceName, rep.CollectorServiceName()) @@ -108,7 +108,7 @@ func TestBuilderWithCollectors(t *testing.T) { cfg := &Builder{ CollectorHostPorts: hostPorts, } - agent, err := cfg.CreateReporter("", zap.NewNop()) + agent, err := cfg.CreateReporter(nil, zap.NewNop()) assert.NoError(t, err) assert.NotNil(t, agent) diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy.go b/cmd/agent/app/reporter/tchannel/collector_proxy.go index d2a35fe1b2a..2a30e003608 100644 --- a/cmd/agent/app/reporter/tchannel/collector_proxy.go +++ b/cmd/agent/app/reporter/tchannel/collector_proxy.go @@ -31,7 +31,7 @@ type ProxyBuilder struct { } // NewCollectorProxy creates ProxyBuilder -func NewCollectorProxy(builder *Builder, agentTags string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) { +func NewCollectorProxy(builder *Builder, agentTags map[string]string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) { r, err := builder.CreateReporter(agentTags, logger) if err != nil { return nil, err diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy_test.go b/cmd/agent/app/reporter/tchannel/collector_proxy_test.go index e0a8e9b3890..baeb4eb29cc 100644 --- a/cmd/agent/app/reporter/tchannel/collector_proxy_test.go +++ b/cmd/agent/app/reporter/tchannel/collector_proxy_test.go @@ -29,7 +29,7 @@ import ( func TestErrorReporterBuilder(t *testing.T) { tbuilder := NewBuilder().WithDiscoverer(fakeDiscoverer{}) - b, err := NewCollectorProxy(tbuilder, "", metrics.NullFactory, zap.NewNop()) + b, err := NewCollectorProxy(tbuilder, nil, metrics.NullFactory, zap.NewNop()) require.Error(t, err) assert.Nil(t, b) } @@ -38,10 +38,10 @@ func TestCreate(t *testing.T) { cfg := &Builder{} mFactory := metrics.NullFactory logger := zap.NewNop() - b, err := NewCollectorProxy(cfg, "", mFactory, logger) + b, err := NewCollectorProxy(cfg, nil, mFactory, logger) require.NoError(t, err) assert.NotNil(t, b) - r, _ := cfg.CreateReporter("", logger) + r, _ := cfg.CreateReporter(nil, logger) assert.Equal(t, reporter.WrapWithMetrics(r, mFactory), b.GetReporter()) m := tchannel.NewConfigManager(r.CollectorServiceName(), r.Channel()) assert.Equal(t, configmanager.WrapWithMetrics(m, mFactory), b.GetManager()) diff --git a/cmd/agent/app/reporter/tchannel/reporter.go b/cmd/agent/app/reporter/tchannel/reporter.go index d1c3e2f38b1..afcfd6e64c7 100644 --- a/cmd/agent/app/reporter/tchannel/reporter.go +++ b/cmd/agent/app/reporter/tchannel/reporter.go @@ -15,8 +15,6 @@ package tchannel import ( - "os" - "strings" "time" "github.com/uber/tchannel-go" @@ -46,13 +44,13 @@ func New( channel *tchannel.Channel, reportTimeout time.Duration, peerListMgr *peerlistmgr.PeerListManager, - agentTagString string, + agentTagString map[string]string, zlogger *zap.Logger, ) *Reporter { thriftClient := thrift.NewClient(channel, collectorServiceName, nil) zClient := zipkincore.NewTChanZipkinCollectorClient(thriftClient) jClient := jaeger.NewTChanCollectorClient(thriftClient) - agentTags := parseAgentTags(agentTagString) + agentTags := makeJaegerTag(agentTagString) return &Reporter{ channel: channel, zClient: zClient, @@ -137,23 +135,9 @@ func addAgentTags(spans []*jaeger.Span, agentTags []jaeger.Tag) []*jaeger.Span { return spans } -// Parsing logic borrowed from jaegertracing/jaeger-client-go -func parseAgentTags(agentTags string) []jaeger.Tag { - tagPairs := strings.Split(agentTags, ",") +func makeJaegerTag(agentTags map[string]string) []jaeger.Tag { tags := make([]jaeger.Tag, 0) - for _, p := range tagPairs { - kv := strings.SplitN(p, "=", 2) - k, v := strings.TrimSpace(kv[0]), strings.TrimSpace(kv[1]) - - if strings.HasPrefix(v, "${") && strings.HasSuffix(v, "}") { - ed := strings.SplitN(v[2:len(v)-1], ":", 2) - e, d := ed[0], ed[1] - v = os.Getenv(e) - if v == "" && d != "" { - v = d - } - } - + for k, v := range agentTags { tag := jaeger.Tag{ Key: k, VStr: &v, diff --git a/cmd/agent/app/reporter/tchannel/reporter_test.go b/cmd/agent/app/reporter/tchannel/reporter_test.go index 7a7ac216007..dd29f5c5d7e 100644 --- a/cmd/agent/app/reporter/tchannel/reporter_test.go +++ b/cmd/agent/app/reporter/tchannel/reporter_test.go @@ -30,7 +30,7 @@ import ( func initRequirements(t *testing.T) (*metricstest.Factory, *testutils.MockTCollector, *Reporter) { metricsFactory, collector := testutils.InitMockCollector(t) - reporter := New("jaeger-collector", collector.Channel, time.Second, nil, "", zap.NewNop()) + reporter := New("jaeger-collector", collector.Channel, time.Second, nil, nil, zap.NewNop()) return metricsFactory, collector, reporter }