Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for agent level tag #1396

Merged
merged 18 commits into from
Mar 18, 2019
2 changes: 1 addition & 1 deletion cmd/agent/app/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func CreateCollectorProxy(
}
switch opts.ReporterType {
case reporter.GRPC:
return grpc.NewCollectorProxy(grpcRepOpts, mFactory, logger)
return grpc.NewCollectorProxy(grpcRepOpts, opts.AgentTags, mFactory, logger)
case reporter.TCHANNEL:
return tchannel.NewCollectorProxy(tchanRep, mFactory, logger)
default:
Expand Down
34 changes: 34 additions & 0 deletions cmd/agent/app/reporter/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@ package reporter
import (
"flag"
"fmt"
"os"
"strings"

"github.com/spf13/viper"
)

const (
// Whether to use grpc or tchannel reporter.
reporterType = "reporter.type"
// Agent tags
agentTags = "jaeger.tags"
// TCHANNEL is name of tchannel reporter.
TCHANNEL Type = "tchannel"
// GRPC is name of gRPC reporter.
Expand All @@ -35,15 +40,44 @@ type Type string
// Options holds generic reporter configuration.
type Options struct {
ReporterType Type
AgentTags map[string]string
}

// AddFlags adds flags for Options.
func AddFlags(flags *flag.FlagSet) {
flags.String(reporterType, string(GRPC), fmt.Sprintf("Reporter type to use e.g. %s, %s", string(GRPC), string(TCHANNEL)))
flags.String(agentTags, "", "One or more tags to be added to the Process tags of all spans passing through this agent. Ex: key1=value1,key2=${envVar:defaultValue}")
}

// InitFromViper initializes Options with properties retrieved from Viper.
func (b *Options) InitFromViper(v *viper.Viper) *Options {
b.ReporterType = Type(v.GetString(reporterType))
b.AgentTags = parseAgentTags(v.GetString(agentTags))
return b
}

// Parsing logic borrowed from jaegertracing/jaeger-client-go
func parseAgentTags(agentTags string) map[string]string {
if agentTags == "" {
return nil
}
tagPairs := strings.Split(string(agentTags), ",")
tags := make(map[string]string)
for _, p := range tagPairs {
kv := strings.SplitN(p, "=", 2)
k, v := strings.TrimSpace(kv[0]), strings.TrimSpace(kv[1])

if strings.HasPrefix(v, "${") && strings.HasSuffix(v, "}") {
ed := strings.SplitN(string(v[2:len(v)-1]), ":", 2)
e, d := ed[0], ed[1]
v = os.Getenv(e)
if v == "" && d != "" {
v = d
}
}

tags[k] = v
}

return tags
}
31 changes: 31 additions & 0 deletions cmd/agent/app/reporter/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package reporter

import (
"flag"
"os"
"testing"

"github.com/spf13/cobra"
Expand All @@ -24,6 +25,25 @@ import (
"github.com/stretchr/testify/require"
)

func TestBindFlags_NoJaegerTags(t *testing.T) {
v := viper.New()
command := cobra.Command{}
flags := &flag.FlagSet{}
AddFlags(flags)
command.PersistentFlags().AddGoFlagSet(flags)
v.BindPFlags(command.PersistentFlags())

err := command.ParseFlags([]string{
"--reporter.type=grpc",
})
require.NoError(t, err)

b := &Options{}
b.InitFromViper(v)
assert.Equal(t, Type("grpc"), b.ReporterType)
assert.Len(t, b.AgentTags, 0)
}

func TestBindFlags(t *testing.T) {
v := viper.New()
command := cobra.Command{}
Expand All @@ -34,10 +54,21 @@ func TestBindFlags(t *testing.T) {

err := command.ParseFlags([]string{
"--reporter.type=grpc",
"--jaeger.tags=key=value,envVar1=${envKey1:defaultVal1},envVar2=${envKey2:defaultVal2}",
})
require.NoError(t, err)

b := &Options{}
os.Setenv("envKey1", "envVal1")
b.InitFromViper(v)

expectedTags := map[string]string{
"key" : "value",
"envVar1" : "envVal1",
"envVar2" : "defaultVal2",
}

assert.Equal(t, Type("grpc"), b.ReporterType)
assert.Equal(t, expectedTags, b.AgentTags)
os.Unsetenv("envKey1")
}
4 changes: 2 additions & 2 deletions cmd/agent/app/reporter/grpc/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ProxyBuilder struct {
var systemCertPool = x509.SystemCertPool // to allow overriding in unit test

// NewCollectorProxy creates ProxyBuilder
func NewCollectorProxy(o *Options, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
func NewCollectorProxy(o *Options, agentTags map[string]string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
if len(o.CollectorHostPort) == 0 {
return nil, errors.New("could not create collector proxy, address is missing")
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func NewCollectorProxy(o *Options, mFactory metrics.Factory, logger *zap.Logger)
grpcMetrics := mFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"protocol": "grpc"}})
return &ProxyBuilder{
conn: conn,
reporter: aReporter.WrapWithMetrics(NewReporter(conn, logger), grpcMetrics),
reporter: aReporter.WrapWithMetrics(NewReporter(conn, agentTags, logger), grpcMetrics),
manager: configmanager.WrapWithMetrics(grpcManager.NewConfigManager(conn), grpcMetrics)}, nil
}

Expand Down
8 changes: 4 additions & 4 deletions cmd/agent/app/reporter/grpc/collector_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ iPKnCkzNgxMzQtwdgpAOXIAqXyNibvyOAv1C+3QSMLKbuPEHaIxlCuvl1suX/g25
-----END CERTIFICATE-----`

func TestProxyBuilderMissingAddress(t *testing.T) {
proxy, err := NewCollectorProxy(&Options{}, metrics.NullFactory, zap.NewNop())
proxy, err := NewCollectorProxy(&Options{}, nil, metrics.NullFactory, zap.NewNop())
require.Nil(t, proxy)
assert.EqualError(t, err, "could not create collector proxy, address is missing")
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestProxyBuilder(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
proxy, err := NewCollectorProxy(test.proxyOptions, metrics.NullFactory, zap.NewNop())
proxy, err := NewCollectorProxy(test.proxyOptions, nil, metrics.NullFactory, zap.NewNop())
if test.expectError {
require.Error(t, err)
} else {
Expand All @@ -125,7 +125,7 @@ func TestSystemCertPoolError(t *testing.T) {
_, err := NewCollectorProxy(&Options{
CollectorHostPort: []string{"foo", "bar"},
TLS: true,
}, nil, nil)
}, nil, nil, nil)
assert.Equal(t, fakeErr, err)
}

Expand All @@ -142,7 +142,7 @@ func TestMultipleCollectors(t *testing.T) {
defer s2.Stop()

mFactory := metricstest.NewFactory(time.Microsecond)
proxy, err := NewCollectorProxy(&Options{CollectorHostPort: []string{addr1.String(), addr2.String()}}, mFactory, zap.NewNop())
proxy, err := NewCollectorProxy(&Options{CollectorHostPort: []string{addr1.String(), addr2.String()}}, nil, mFactory, zap.NewNop())
require.NoError(t, err)
require.NotNil(t, proxy)
assert.NotNil(t, proxy.GetReporter())
Expand Down
31 changes: 30 additions & 1 deletion cmd/agent/app/reporter/grpc/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ import (
// Reporter reports data to collector over gRPC.
type Reporter struct {
collector api_v2.CollectorServiceClient
agentTags []model.KeyValue
logger *zap.Logger
sanitizer zipkin2.Sanitizer
}

// NewReporter creates gRPC reporter.
func NewReporter(conn *grpc.ClientConn, logger *zap.Logger) *Reporter {
func NewReporter(conn *grpc.ClientConn, agentTags map[string]string, logger *zap.Logger) *Reporter {
return &Reporter{
collector: api_v2.NewCollectorServiceClient(conn),
agentTags: makeModelKeyValue(agentTags),
logger: logger,
sanitizer: zipkin2.NewChainedSanitizer(zipkin2.StandardSanitizers...),
}
Expand All @@ -63,6 +65,7 @@ func (r *Reporter) EmitZipkinBatch(zSpans []*zipkincore.Span) error {
}

func (r *Reporter) send(spans []*model.Span, process *model.Process) error {
spans, process = addProcessTags(spans, process, r.agentTags)
batch := model.Batch{Spans: spans, Process: process}
req := &api_v2.PostSpansRequest{Batch: batch}
_, err := r.collector.PostSpans(context.Background(), req)
Expand All @@ -71,3 +74,29 @@ func (r *Reporter) send(spans []*model.Span, process *model.Process) error {
}
return err
}

// addTags appends jaeger tags for the agent to every span it sends to the collector.
func addProcessTags(spans []*model.Span, process *model.Process, agentTags []model.KeyValue) ([]*model.Span, *model.Process) {
if len(agentTags) == 0 {
return spans, process
}
if process != nil {
process.Tags = append(process.Tags, agentTags...)
}
for _, span := range spans {
if span.Process != nil {
span.Process.Tags = append(span.Process.Tags, agentTags...)
}
}
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
return spans, process
}

func makeModelKeyValue(agentTags map[string]string) []model.KeyValue {
tags := make([]model.KeyValue, 0, len(agentTags))
for k, v := range agentTags {
tag := model.String(k, v)
tags = append(tags, tag)
}

return tags
}
53 changes: 49 additions & 4 deletions cmd/agent/app/reporter/grpc/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func TestReporter_EmitZipkinBatch(t *testing.T) {
conn, err := grpc.Dial(addr.String(), grpc.WithInsecure())
defer conn.Close()
require.NoError(t, err)
rep := NewReporter(conn, zap.NewNop())

rep := NewReporter(conn, nil, zap.NewNop())

tm := time.Unix(158, 0)
a := tm.Unix() * 1000 * 1000
Expand All @@ -71,7 +72,8 @@ func TestReporter_EmitZipkinBatch(t *testing.T) {
{in: &zipkincore.Span{Name: "jonatan", TraceID: 1, ID: 2, Timestamp: &a, Annotations: []*zipkincore.Annotation{{Value: zipkincore.CLIENT_SEND, Host: &zipkincore.Endpoint{ServiceName: "spring"}}}},
expected: model.Batch{
Spans: []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan", Duration: time.Microsecond * 1,
Tags: model.KeyValues{{Key: "span.kind", VStr: "client", VType: model.StringType}}, Process: &model.Process{ServiceName: "spring"}, StartTime: tm.UTC()}}}},
Tags: model.KeyValues{{Key: "span.kind", VStr: "client", VType: model.StringType}},
Process: &model.Process{ServiceName: "spring"}, StartTime: tm.UTC()}}}},
}
for _, test := range tests {
err = rep.EmitZipkinBatch([]*zipkincore.Span{test.in})
Expand All @@ -93,7 +95,7 @@ func TestReporter_EmitBatch(t *testing.T) {
conn, err := grpc.Dial(addr.String(), grpc.WithInsecure())
defer conn.Close()
require.NoError(t, err)
rep := NewReporter(conn, zap.NewNop())
rep := NewReporter(conn, nil, zap.NewNop())

tm := time.Unix(158, 0)
tests := []struct {
Expand All @@ -118,7 +120,50 @@ func TestReporter_EmitBatch(t *testing.T) {
func TestReporter_SendFailure(t *testing.T) {
conn, err := grpc.Dial("", grpc.WithInsecure())
require.NoError(t, err)
rep := NewReporter(conn, zap.NewNop())
rep := NewReporter(conn, nil, zap.NewNop())
err = rep.send(nil, nil)
assert.EqualError(t, err, "rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = \"transport: Error while dialing dial tcp: missing address\"")
}

func TestReporter_AddProcessTags_EmptyTags(t *testing.T) {
tags := map[string]string{}
spans := []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan"}}
actualSpans, _ := addProcessTags(spans, nil, makeModelKeyValue(tags))
assert.Equal(t, spans, actualSpans)
}

func TestReporter_AddProcessTags_ZipkinBatch(t *testing.T) {
tags := map[string]string{ "key" : "value" }
spans := []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan", Process: &model.Process{ServiceName: "spring"}}}

expectedSpans := []*model.Span{
{
TraceID: model.NewTraceID(0, 1),
SpanID: model.NewSpanID(2),
OperationName: "jonatan",
Process: &model.Process{ServiceName: "spring", Tags: []model.KeyValue{model.String("key", "value")}},
},
}
actualSpans, _ := addProcessTags(spans, nil, makeModelKeyValue(tags))

assert.Equal(t, expectedSpans, actualSpans)
}

func TestReporter_AddProcessTags_JaegerBatch(t *testing.T) {
tags := map[string]string{ "key" : "value" }
spans := []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan"}}
process := &model.Process{ServiceName: "spring"}

expectedProcess := &model.Process{ServiceName: "spring", Tags: []model.KeyValue{model.String("key", "value")}}
_, actualProcess := addProcessTags(spans, process, makeModelKeyValue(tags))

assert.Equal(t, expectedProcess, actualProcess)
}

func TestReporter_MakeModelKeyValue(t *testing.T) {
expectedTags := []model.KeyValue{model.String("key", "value")}
stringTags := map[string]string{ "key" : "value" }
actualTags := makeModelKeyValue(stringTags)

assert.Equal(t, expectedTags, actualTags)
}
3 changes: 1 addition & 2 deletions cmd/agent/app/reporter/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ func TestMetricsReporter(t *testing.T) {
err := reporter.EmitBatch(&jaeger.Batch{Spans: []*jaeger.Span{{}}})
require.Error(t, err)
}, rep: &noopReporter{err: errors.New("foo")}},
{expectedCounters:
[]metricstest.ExpectedMetric{
{expectedCounters: []metricstest.ExpectedMetric{
{Name: "reporter.batches.failures", Tags: map[string]string{"format": "zipkin"}, Value: 1},
{Name: "reporter.spans.failures", Tags: map[string]string{"format": "zipkin"}, Value: 2},
}, expectedGauges: []metricstest.ExpectedMetric{
Expand Down