diff --git a/cmd/agent/app/builder.go b/cmd/agent/app/builder.go index 218c033f0b5..ad7636aa408 100644 --- a/cmd/agent/app/builder.go +++ b/cmd/agent/app/builder.go @@ -230,7 +230,7 @@ func CreateCollectorProxy( } switch opts.ReporterType { case reporter.GRPC: - return grpc.NewCollectorProxy(grpcRepOpts, mFactory, logger) + return grpc.NewCollectorProxy(grpcRepOpts, opts.AgentTags, mFactory, logger) case reporter.TCHANNEL: return tchannel.NewCollectorProxy(tchanRep, mFactory, logger) default: diff --git a/cmd/agent/app/reporter/flags.go b/cmd/agent/app/reporter/flags.go index 8db51bf9388..c231eaac2d7 100644 --- a/cmd/agent/app/reporter/flags.go +++ b/cmd/agent/app/reporter/flags.go @@ -17,12 +17,17 @@ package reporter import ( "flag" "fmt" + "os" + "strings" "github.com/spf13/viper" ) const ( + // Whether to use grpc or tchannel reporter. reporterType = "reporter.type" + // Agent tags + agentTags = "jaeger.tags" // TCHANNEL is name of tchannel reporter. TCHANNEL Type = "tchannel" // GRPC is name of gRPC reporter. @@ -35,15 +40,44 @@ type Type string // Options holds generic reporter configuration. type Options struct { ReporterType Type + AgentTags map[string]string } // AddFlags adds flags for Options. func AddFlags(flags *flag.FlagSet) { flags.String(reporterType, string(GRPC), fmt.Sprintf("Reporter type to use e.g. %s, %s", string(GRPC), string(TCHANNEL))) + flags.String(agentTags, "", "One or more tags to be added to the Process tags of all spans passing through this agent. Ex: key1=value1,key2=${envVar:defaultValue}") } // InitFromViper initializes Options with properties retrieved from Viper. func (b *Options) InitFromViper(v *viper.Viper) *Options { b.ReporterType = Type(v.GetString(reporterType)) + b.AgentTags = parseAgentTags(v.GetString(agentTags)) return b } + +// Parsing logic borrowed from jaegertracing/jaeger-client-go +func parseAgentTags(agentTags string) map[string]string { + if agentTags == "" { + return nil + } + tagPairs := strings.Split(string(agentTags), ",") + tags := make(map[string]string) + for _, p := range tagPairs { + kv := strings.SplitN(p, "=", 2) + k, v := strings.TrimSpace(kv[0]), strings.TrimSpace(kv[1]) + + if strings.HasPrefix(v, "${") && strings.HasSuffix(v, "}") { + ed := strings.SplitN(string(v[2:len(v)-1]), ":", 2) + e, d := ed[0], ed[1] + v = os.Getenv(e) + if v == "" && d != "" { + v = d + } + } + + tags[k] = v + } + + return tags +} diff --git a/cmd/agent/app/reporter/flags_test.go b/cmd/agent/app/reporter/flags_test.go index 354a240327a..4696e0ec4fe 100644 --- a/cmd/agent/app/reporter/flags_test.go +++ b/cmd/agent/app/reporter/flags_test.go @@ -16,6 +16,7 @@ package reporter import ( "flag" + "os" "testing" "github.com/spf13/cobra" @@ -24,6 +25,25 @@ import ( "github.com/stretchr/testify/require" ) +func TestBindFlags_NoJaegerTags(t *testing.T) { + v := viper.New() + command := cobra.Command{} + flags := &flag.FlagSet{} + AddFlags(flags) + command.PersistentFlags().AddGoFlagSet(flags) + v.BindPFlags(command.PersistentFlags()) + + err := command.ParseFlags([]string{ + "--reporter.type=grpc", + }) + require.NoError(t, err) + + b := &Options{} + b.InitFromViper(v) + assert.Equal(t, Type("grpc"), b.ReporterType) + assert.Len(t, b.AgentTags, 0) +} + func TestBindFlags(t *testing.T) { v := viper.New() command := cobra.Command{} @@ -34,10 +54,21 @@ func TestBindFlags(t *testing.T) { err := command.ParseFlags([]string{ "--reporter.type=grpc", + "--jaeger.tags=key=value,envVar1=${envKey1:defaultVal1},envVar2=${envKey2:defaultVal2}", }) require.NoError(t, err) b := &Options{} + os.Setenv("envKey1", "envVal1") b.InitFromViper(v) + + expectedTags := map[string]string{ + "key" : "value", + "envVar1" : "envVal1", + "envVar2" : "defaultVal2", + } + assert.Equal(t, Type("grpc"), b.ReporterType) + assert.Equal(t, expectedTags, b.AgentTags) + os.Unsetenv("envKey1") } diff --git a/cmd/agent/app/reporter/grpc/collector_proxy.go b/cmd/agent/app/reporter/grpc/collector_proxy.go index fabd2ef6c7b..06fbdd4a18f 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy.go @@ -42,7 +42,7 @@ type ProxyBuilder struct { var systemCertPool = x509.SystemCertPool // to allow overriding in unit test // NewCollectorProxy creates ProxyBuilder -func NewCollectorProxy(o *Options, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) { +func NewCollectorProxy(o *Options, agentTags map[string]string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) { if len(o.CollectorHostPort) == 0 { return nil, errors.New("could not create collector proxy, address is missing") } @@ -87,7 +87,7 @@ func NewCollectorProxy(o *Options, mFactory metrics.Factory, logger *zap.Logger) grpcMetrics := mFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"protocol": "grpc"}}) return &ProxyBuilder{ conn: conn, - reporter: aReporter.WrapWithMetrics(NewReporter(conn, logger), grpcMetrics), + reporter: aReporter.WrapWithMetrics(NewReporter(conn, agentTags, logger), grpcMetrics), manager: configmanager.WrapWithMetrics(grpcManager.NewConfigManager(conn), grpcMetrics)}, nil } diff --git a/cmd/agent/app/reporter/grpc/collector_proxy_test.go b/cmd/agent/app/reporter/grpc/collector_proxy_test.go index e944e56b0ba..c5fecbdb761 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy_test.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy_test.go @@ -50,7 +50,7 @@ iPKnCkzNgxMzQtwdgpAOXIAqXyNibvyOAv1C+3QSMLKbuPEHaIxlCuvl1suX/g25 -----END CERTIFICATE-----` func TestProxyBuilderMissingAddress(t *testing.T) { - proxy, err := NewCollectorProxy(&Options{}, metrics.NullFactory, zap.NewNop()) + proxy, err := NewCollectorProxy(&Options{}, nil, metrics.NullFactory, zap.NewNop()) require.Nil(t, proxy) assert.EqualError(t, err, "could not create collector proxy, address is missing") } @@ -99,7 +99,7 @@ func TestProxyBuilder(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - proxy, err := NewCollectorProxy(test.proxyOptions, metrics.NullFactory, zap.NewNop()) + proxy, err := NewCollectorProxy(test.proxyOptions, nil, metrics.NullFactory, zap.NewNop()) if test.expectError { require.Error(t, err) } else { @@ -125,7 +125,7 @@ func TestSystemCertPoolError(t *testing.T) { _, err := NewCollectorProxy(&Options{ CollectorHostPort: []string{"foo", "bar"}, TLS: true, - }, nil, nil) + }, nil, nil, nil) assert.Equal(t, fakeErr, err) } @@ -142,7 +142,7 @@ func TestMultipleCollectors(t *testing.T) { defer s2.Stop() mFactory := metricstest.NewFactory(time.Microsecond) - proxy, err := NewCollectorProxy(&Options{CollectorHostPort: []string{addr1.String(), addr2.String()}}, mFactory, zap.NewNop()) + proxy, err := NewCollectorProxy(&Options{CollectorHostPort: []string{addr1.String(), addr2.String()}}, nil, mFactory, zap.NewNop()) require.NoError(t, err) require.NotNil(t, proxy) assert.NotNil(t, proxy.GetReporter()) diff --git a/cmd/agent/app/reporter/grpc/reporter.go b/cmd/agent/app/reporter/grpc/reporter.go index 38f3425dee9..0dfcbca09e0 100644 --- a/cmd/agent/app/reporter/grpc/reporter.go +++ b/cmd/agent/app/reporter/grpc/reporter.go @@ -32,14 +32,16 @@ import ( // Reporter reports data to collector over gRPC. type Reporter struct { collector api_v2.CollectorServiceClient + agentTags []model.KeyValue logger *zap.Logger sanitizer zipkin2.Sanitizer } // NewReporter creates gRPC reporter. -func NewReporter(conn *grpc.ClientConn, logger *zap.Logger) *Reporter { +func NewReporter(conn *grpc.ClientConn, agentTags map[string]string, logger *zap.Logger) *Reporter { return &Reporter{ collector: api_v2.NewCollectorServiceClient(conn), + agentTags: makeModelKeyValue(agentTags), logger: logger, sanitizer: zipkin2.NewChainedSanitizer(zipkin2.StandardSanitizers...), } @@ -63,6 +65,7 @@ func (r *Reporter) EmitZipkinBatch(zSpans []*zipkincore.Span) error { } func (r *Reporter) send(spans []*model.Span, process *model.Process) error { + spans, process = addProcessTags(spans, process, r.agentTags) batch := model.Batch{Spans: spans, Process: process} req := &api_v2.PostSpansRequest{Batch: batch} _, err := r.collector.PostSpans(context.Background(), req) @@ -71,3 +74,29 @@ func (r *Reporter) send(spans []*model.Span, process *model.Process) error { } return err } + +// addTags appends jaeger tags for the agent to every span it sends to the collector. +func addProcessTags(spans []*model.Span, process *model.Process, agentTags []model.KeyValue) ([]*model.Span, *model.Process) { + if len(agentTags) == 0 { + return spans, process + } + if process != nil { + process.Tags = append(process.Tags, agentTags...) + } + for _, span := range spans { + if span.Process != nil { + span.Process.Tags = append(span.Process.Tags, agentTags...) + } + } + return spans, process +} + +func makeModelKeyValue(agentTags map[string]string) []model.KeyValue { + tags := make([]model.KeyValue, 0, len(agentTags)) + for k, v := range agentTags { + tag := model.String(k, v) + tags = append(tags, tag) + } + + return tags +} diff --git a/cmd/agent/app/reporter/grpc/reporter_test.go b/cmd/agent/app/reporter/grpc/reporter_test.go index 89d94c5b9e5..51c9eba5719 100644 --- a/cmd/agent/app/reporter/grpc/reporter_test.go +++ b/cmd/agent/app/reporter/grpc/reporter_test.go @@ -58,7 +58,8 @@ func TestReporter_EmitZipkinBatch(t *testing.T) { conn, err := grpc.Dial(addr.String(), grpc.WithInsecure()) defer conn.Close() require.NoError(t, err) - rep := NewReporter(conn, zap.NewNop()) + + rep := NewReporter(conn, nil, zap.NewNop()) tm := time.Unix(158, 0) a := tm.Unix() * 1000 * 1000 @@ -71,7 +72,8 @@ func TestReporter_EmitZipkinBatch(t *testing.T) { {in: &zipkincore.Span{Name: "jonatan", TraceID: 1, ID: 2, Timestamp: &a, Annotations: []*zipkincore.Annotation{{Value: zipkincore.CLIENT_SEND, Host: &zipkincore.Endpoint{ServiceName: "spring"}}}}, expected: model.Batch{ Spans: []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan", Duration: time.Microsecond * 1, - Tags: model.KeyValues{{Key: "span.kind", VStr: "client", VType: model.StringType}}, Process: &model.Process{ServiceName: "spring"}, StartTime: tm.UTC()}}}}, + Tags: model.KeyValues{{Key: "span.kind", VStr: "client", VType: model.StringType}}, + Process: &model.Process{ServiceName: "spring"}, StartTime: tm.UTC()}}}}, } for _, test := range tests { err = rep.EmitZipkinBatch([]*zipkincore.Span{test.in}) @@ -93,7 +95,7 @@ func TestReporter_EmitBatch(t *testing.T) { conn, err := grpc.Dial(addr.String(), grpc.WithInsecure()) defer conn.Close() require.NoError(t, err) - rep := NewReporter(conn, zap.NewNop()) + rep := NewReporter(conn, nil, zap.NewNop()) tm := time.Unix(158, 0) tests := []struct { @@ -118,7 +120,50 @@ func TestReporter_EmitBatch(t *testing.T) { func TestReporter_SendFailure(t *testing.T) { conn, err := grpc.Dial("", grpc.WithInsecure()) require.NoError(t, err) - rep := NewReporter(conn, zap.NewNop()) + rep := NewReporter(conn, nil, zap.NewNop()) err = rep.send(nil, nil) assert.EqualError(t, err, "rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = \"transport: Error while dialing dial tcp: missing address\"") } + +func TestReporter_AddProcessTags_EmptyTags(t *testing.T) { + tags := map[string]string{} + spans := []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan"}} + actualSpans, _ := addProcessTags(spans, nil, makeModelKeyValue(tags)) + assert.Equal(t, spans, actualSpans) +} + +func TestReporter_AddProcessTags_ZipkinBatch(t *testing.T) { + tags := map[string]string{ "key" : "value" } + spans := []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan", Process: &model.Process{ServiceName: "spring"}}} + + expectedSpans := []*model.Span{ + { + TraceID: model.NewTraceID(0, 1), + SpanID: model.NewSpanID(2), + OperationName: "jonatan", + Process: &model.Process{ServiceName: "spring", Tags: []model.KeyValue{model.String("key", "value")}}, + }, + } + actualSpans, _ := addProcessTags(spans, nil, makeModelKeyValue(tags)) + + assert.Equal(t, expectedSpans, actualSpans) +} + +func TestReporter_AddProcessTags_JaegerBatch(t *testing.T) { + tags := map[string]string{ "key" : "value" } + spans := []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan"}} + process := &model.Process{ServiceName: "spring"} + + expectedProcess := &model.Process{ServiceName: "spring", Tags: []model.KeyValue{model.String("key", "value")}} + _, actualProcess := addProcessTags(spans, process, makeModelKeyValue(tags)) + + assert.Equal(t, expectedProcess, actualProcess) +} + +func TestReporter_MakeModelKeyValue(t *testing.T) { + expectedTags := []model.KeyValue{model.String("key", "value")} + stringTags := map[string]string{ "key" : "value" } + actualTags := makeModelKeyValue(stringTags) + + assert.Equal(t, expectedTags, actualTags) +} diff --git a/cmd/agent/app/reporter/metrics_test.go b/cmd/agent/app/reporter/metrics_test.go index d8abac22e5c..151488994a3 100644 --- a/cmd/agent/app/reporter/metrics_test.go +++ b/cmd/agent/app/reporter/metrics_test.go @@ -104,8 +104,7 @@ func TestMetricsReporter(t *testing.T) { err := reporter.EmitBatch(&jaeger.Batch{Spans: []*jaeger.Span{{}}}) require.Error(t, err) }, rep: &noopReporter{err: errors.New("foo")}}, - {expectedCounters: - []metricstest.ExpectedMetric{ + {expectedCounters: []metricstest.ExpectedMetric{ {Name: "reporter.batches.failures", Tags: map[string]string{"format": "zipkin"}, Value: 1}, {Name: "reporter.spans.failures", Tags: map[string]string{"format": "zipkin"}, Value: 2}, }, expectedGauges: []metricstest.ExpectedMetric{