Skip to content

Commit

Permalink
Adds support for agent level tag (#1396)
Browse files Browse the repository at this point in the history
* Basic POC for agent based tags

Signed-off-by: Annanay <annanay.a@media.net>

* Address comments

Signed-off-by: Annanay <annanay.a@media.net>

* Rebased

Signed-off-by: Annanay <annanay.a@media.net>

* Fix tests and build errors

Signed-off-by: Annanay <annanay.a@media.net>

* Address comments

Signed-off-by: Annanay <annanay.a@media.net>

* nit, fix tests

Signed-off-by: Annanay <annanay.a@media.net>

nit, fix function call

Signed-off-by: Annanay <annanay.a@media.net>

nit, fix function call

Signed-off-by: Annanay <annanay.a@media.net>

fix fmt, gosimple

Signed-off-by: Annanay <annanay.a@media.net>

make fmt

* Start adding tests

Signed-off-by: Annanay <annanay.a@media.net>

Fix tag names

Signed-off-by: Annanay <annanay.a@media.net>

* Fix TestCollectorProxy test

Signed-off-by: Annanay <annanay.a@media.net>

* Tags are added in send, not in EmitZipkinBatch

Signed-off-by: Annanay <annanay.a@media.net>

* Addressed comments, added tests

Signed-off-by: Annanay <annanay.a@media.net>

* Remove debug print

Signed-off-by: Annanay <annanay.a@media.net>

* Remove all changes to tchannel reporter

Signed-off-by: Annanay <annanay.a@media.net>

* Add tests, fix code coverage

Signed-off-by: Annanay <annanay.a@media.net>

* Address comments, run make fmt

Signed-off-by: Annanay <annanay.a@media.net>

* Addressed comments

Signed-off-by: Annanay <annanay.a@media.net>

* Address comments

Signed-off-by: Annanay <annanay.a@media.net>

* Address comments

Signed-off-by: Annanay <annanay.a@media.net>
  • Loading branch information
annanay25 authored and yurishkuro committed Mar 18, 2019
1 parent 1157a7d commit 8e76a50
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 14 deletions.
2 changes: 1 addition & 1 deletion cmd/agent/app/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func CreateCollectorProxy(
}
switch opts.ReporterType {
case reporter.GRPC:
return grpc.NewCollectorProxy(grpcRepOpts, mFactory, logger)
return grpc.NewCollectorProxy(grpcRepOpts, opts.AgentTags, mFactory, logger)
case reporter.TCHANNEL:
return tchannel.NewCollectorProxy(tchanRep, mFactory, logger)
default:
Expand Down
34 changes: 34 additions & 0 deletions cmd/agent/app/reporter/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@ package reporter
import (
"flag"
"fmt"
"os"
"strings"

"github.com/spf13/viper"
)

const (
// Whether to use grpc or tchannel reporter.
reporterType = "reporter.type"
// Agent tags
agentTags = "jaeger.tags"
// TCHANNEL is name of tchannel reporter.
TCHANNEL Type = "tchannel"
// GRPC is name of gRPC reporter.
Expand All @@ -35,15 +40,44 @@ type Type string
// Options holds generic reporter configuration.
type Options struct {
ReporterType Type
AgentTags map[string]string
}

// AddFlags adds flags for Options.
func AddFlags(flags *flag.FlagSet) {
flags.String(reporterType, string(GRPC), fmt.Sprintf("Reporter type to use e.g. %s, %s", string(GRPC), string(TCHANNEL)))
flags.String(agentTags, "", "One or more tags to be added to the Process tags of all spans passing through this agent. Ex: key1=value1,key2=${envVar:defaultValue}")
}

// InitFromViper initializes Options with properties retrieved from Viper.
func (b *Options) InitFromViper(v *viper.Viper) *Options {
b.ReporterType = Type(v.GetString(reporterType))
b.AgentTags = parseAgentTags(v.GetString(agentTags))
return b
}

// Parsing logic borrowed from jaegertracing/jaeger-client-go
func parseAgentTags(agentTags string) map[string]string {
if agentTags == "" {
return nil
}
tagPairs := strings.Split(string(agentTags), ",")
tags := make(map[string]string)
for _, p := range tagPairs {
kv := strings.SplitN(p, "=", 2)
k, v := strings.TrimSpace(kv[0]), strings.TrimSpace(kv[1])

if strings.HasPrefix(v, "${") && strings.HasSuffix(v, "}") {
ed := strings.SplitN(string(v[2:len(v)-1]), ":", 2)
e, d := ed[0], ed[1]
v = os.Getenv(e)
if v == "" && d != "" {
v = d
}
}

tags[k] = v
}

return tags
}
31 changes: 31 additions & 0 deletions cmd/agent/app/reporter/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package reporter

import (
"flag"
"os"
"testing"

"github.com/spf13/cobra"
Expand All @@ -24,6 +25,25 @@ import (
"github.com/stretchr/testify/require"
)

func TestBindFlags_NoJaegerTags(t *testing.T) {
v := viper.New()
command := cobra.Command{}
flags := &flag.FlagSet{}
AddFlags(flags)
command.PersistentFlags().AddGoFlagSet(flags)
v.BindPFlags(command.PersistentFlags())

err := command.ParseFlags([]string{
"--reporter.type=grpc",
})
require.NoError(t, err)

b := &Options{}
b.InitFromViper(v)
assert.Equal(t, Type("grpc"), b.ReporterType)
assert.Len(t, b.AgentTags, 0)
}

func TestBindFlags(t *testing.T) {
v := viper.New()
command := cobra.Command{}
Expand All @@ -34,10 +54,21 @@ func TestBindFlags(t *testing.T) {

err := command.ParseFlags([]string{
"--reporter.type=grpc",
"--jaeger.tags=key=value,envVar1=${envKey1:defaultVal1},envVar2=${envKey2:defaultVal2}",
})
require.NoError(t, err)

b := &Options{}
os.Setenv("envKey1", "envVal1")
b.InitFromViper(v)

expectedTags := map[string]string{
"key" : "value",
"envVar1" : "envVal1",
"envVar2" : "defaultVal2",
}

assert.Equal(t, Type("grpc"), b.ReporterType)
assert.Equal(t, expectedTags, b.AgentTags)
os.Unsetenv("envKey1")
}
4 changes: 2 additions & 2 deletions cmd/agent/app/reporter/grpc/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ProxyBuilder struct {
var systemCertPool = x509.SystemCertPool // to allow overriding in unit test

// NewCollectorProxy creates ProxyBuilder
func NewCollectorProxy(o *Options, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
func NewCollectorProxy(o *Options, agentTags map[string]string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
if len(o.CollectorHostPort) == 0 {
return nil, errors.New("could not create collector proxy, address is missing")
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func NewCollectorProxy(o *Options, mFactory metrics.Factory, logger *zap.Logger)
grpcMetrics := mFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"protocol": "grpc"}})
return &ProxyBuilder{
conn: conn,
reporter: aReporter.WrapWithMetrics(NewReporter(conn, logger), grpcMetrics),
reporter: aReporter.WrapWithMetrics(NewReporter(conn, agentTags, logger), grpcMetrics),
manager: configmanager.WrapWithMetrics(grpcManager.NewConfigManager(conn), grpcMetrics)}, nil
}

Expand Down
8 changes: 4 additions & 4 deletions cmd/agent/app/reporter/grpc/collector_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ iPKnCkzNgxMzQtwdgpAOXIAqXyNibvyOAv1C+3QSMLKbuPEHaIxlCuvl1suX/g25
-----END CERTIFICATE-----`

func TestProxyBuilderMissingAddress(t *testing.T) {
proxy, err := NewCollectorProxy(&Options{}, metrics.NullFactory, zap.NewNop())
proxy, err := NewCollectorProxy(&Options{}, nil, metrics.NullFactory, zap.NewNop())
require.Nil(t, proxy)
assert.EqualError(t, err, "could not create collector proxy, address is missing")
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestProxyBuilder(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
proxy, err := NewCollectorProxy(test.proxyOptions, metrics.NullFactory, zap.NewNop())
proxy, err := NewCollectorProxy(test.proxyOptions, nil, metrics.NullFactory, zap.NewNop())
if test.expectError {
require.Error(t, err)
} else {
Expand All @@ -125,7 +125,7 @@ func TestSystemCertPoolError(t *testing.T) {
_, err := NewCollectorProxy(&Options{
CollectorHostPort: []string{"foo", "bar"},
TLS: true,
}, nil, nil)
}, nil, nil, nil)
assert.Equal(t, fakeErr, err)
}

Expand All @@ -142,7 +142,7 @@ func TestMultipleCollectors(t *testing.T) {
defer s2.Stop()

mFactory := metricstest.NewFactory(time.Microsecond)
proxy, err := NewCollectorProxy(&Options{CollectorHostPort: []string{addr1.String(), addr2.String()}}, mFactory, zap.NewNop())
proxy, err := NewCollectorProxy(&Options{CollectorHostPort: []string{addr1.String(), addr2.String()}}, nil, mFactory, zap.NewNop())
require.NoError(t, err)
require.NotNil(t, proxy)
assert.NotNil(t, proxy.GetReporter())
Expand Down
31 changes: 30 additions & 1 deletion cmd/agent/app/reporter/grpc/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ import (
// Reporter reports data to collector over gRPC.
type Reporter struct {
collector api_v2.CollectorServiceClient
agentTags []model.KeyValue
logger *zap.Logger
sanitizer zipkin2.Sanitizer
}

// NewReporter creates gRPC reporter.
func NewReporter(conn *grpc.ClientConn, logger *zap.Logger) *Reporter {
func NewReporter(conn *grpc.ClientConn, agentTags map[string]string, logger *zap.Logger) *Reporter {
return &Reporter{
collector: api_v2.NewCollectorServiceClient(conn),
agentTags: makeModelKeyValue(agentTags),
logger: logger,
sanitizer: zipkin2.NewChainedSanitizer(zipkin2.StandardSanitizers...),
}
Expand All @@ -63,6 +65,7 @@ func (r *Reporter) EmitZipkinBatch(zSpans []*zipkincore.Span) error {
}

func (r *Reporter) send(spans []*model.Span, process *model.Process) error {
spans, process = addProcessTags(spans, process, r.agentTags)
batch := model.Batch{Spans: spans, Process: process}
req := &api_v2.PostSpansRequest{Batch: batch}
_, err := r.collector.PostSpans(context.Background(), req)
Expand All @@ -71,3 +74,29 @@ func (r *Reporter) send(spans []*model.Span, process *model.Process) error {
}
return err
}

// addTags appends jaeger tags for the agent to every span it sends to the collector.
func addProcessTags(spans []*model.Span, process *model.Process, agentTags []model.KeyValue) ([]*model.Span, *model.Process) {
if len(agentTags) == 0 {
return spans, process
}
if process != nil {
process.Tags = append(process.Tags, agentTags...)
}
for _, span := range spans {
if span.Process != nil {
span.Process.Tags = append(span.Process.Tags, agentTags...)
}
}
return spans, process
}

func makeModelKeyValue(agentTags map[string]string) []model.KeyValue {
tags := make([]model.KeyValue, 0, len(agentTags))
for k, v := range agentTags {
tag := model.String(k, v)
tags = append(tags, tag)
}

return tags
}
53 changes: 49 additions & 4 deletions cmd/agent/app/reporter/grpc/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func TestReporter_EmitZipkinBatch(t *testing.T) {
conn, err := grpc.Dial(addr.String(), grpc.WithInsecure())
defer conn.Close()
require.NoError(t, err)
rep := NewReporter(conn, zap.NewNop())

rep := NewReporter(conn, nil, zap.NewNop())

tm := time.Unix(158, 0)
a := tm.Unix() * 1000 * 1000
Expand All @@ -71,7 +72,8 @@ func TestReporter_EmitZipkinBatch(t *testing.T) {
{in: &zipkincore.Span{Name: "jonatan", TraceID: 1, ID: 2, Timestamp: &a, Annotations: []*zipkincore.Annotation{{Value: zipkincore.CLIENT_SEND, Host: &zipkincore.Endpoint{ServiceName: "spring"}}}},
expected: model.Batch{
Spans: []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan", Duration: time.Microsecond * 1,
Tags: model.KeyValues{{Key: "span.kind", VStr: "client", VType: model.StringType}}, Process: &model.Process{ServiceName: "spring"}, StartTime: tm.UTC()}}}},
Tags: model.KeyValues{{Key: "span.kind", VStr: "client", VType: model.StringType}},
Process: &model.Process{ServiceName: "spring"}, StartTime: tm.UTC()}}}},
}
for _, test := range tests {
err = rep.EmitZipkinBatch([]*zipkincore.Span{test.in})
Expand All @@ -93,7 +95,7 @@ func TestReporter_EmitBatch(t *testing.T) {
conn, err := grpc.Dial(addr.String(), grpc.WithInsecure())
defer conn.Close()
require.NoError(t, err)
rep := NewReporter(conn, zap.NewNop())
rep := NewReporter(conn, nil, zap.NewNop())

tm := time.Unix(158, 0)
tests := []struct {
Expand All @@ -118,7 +120,50 @@ func TestReporter_EmitBatch(t *testing.T) {
func TestReporter_SendFailure(t *testing.T) {
conn, err := grpc.Dial("", grpc.WithInsecure())
require.NoError(t, err)
rep := NewReporter(conn, zap.NewNop())
rep := NewReporter(conn, nil, zap.NewNop())
err = rep.send(nil, nil)
assert.EqualError(t, err, "rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = \"transport: Error while dialing dial tcp: missing address\"")
}

func TestReporter_AddProcessTags_EmptyTags(t *testing.T) {
tags := map[string]string{}
spans := []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan"}}
actualSpans, _ := addProcessTags(spans, nil, makeModelKeyValue(tags))
assert.Equal(t, spans, actualSpans)
}

func TestReporter_AddProcessTags_ZipkinBatch(t *testing.T) {
tags := map[string]string{ "key" : "value" }
spans := []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan", Process: &model.Process{ServiceName: "spring"}}}

expectedSpans := []*model.Span{
{
TraceID: model.NewTraceID(0, 1),
SpanID: model.NewSpanID(2),
OperationName: "jonatan",
Process: &model.Process{ServiceName: "spring", Tags: []model.KeyValue{model.String("key", "value")}},
},
}
actualSpans, _ := addProcessTags(spans, nil, makeModelKeyValue(tags))

assert.Equal(t, expectedSpans, actualSpans)
}

func TestReporter_AddProcessTags_JaegerBatch(t *testing.T) {
tags := map[string]string{ "key" : "value" }
spans := []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan"}}
process := &model.Process{ServiceName: "spring"}

expectedProcess := &model.Process{ServiceName: "spring", Tags: []model.KeyValue{model.String("key", "value")}}
_, actualProcess := addProcessTags(spans, process, makeModelKeyValue(tags))

assert.Equal(t, expectedProcess, actualProcess)
}

func TestReporter_MakeModelKeyValue(t *testing.T) {
expectedTags := []model.KeyValue{model.String("key", "value")}
stringTags := map[string]string{ "key" : "value" }
actualTags := makeModelKeyValue(stringTags)

assert.Equal(t, expectedTags, actualTags)
}
3 changes: 1 addition & 2 deletions cmd/agent/app/reporter/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ func TestMetricsReporter(t *testing.T) {
err := reporter.EmitBatch(&jaeger.Batch{Spans: []*jaeger.Span{{}}})
require.Error(t, err)
}, rep: &noopReporter{err: errors.New("foo")}},
{expectedCounters:
[]metricstest.ExpectedMetric{
{expectedCounters: []metricstest.ExpectedMetric{
{Name: "reporter.batches.failures", Tags: map[string]string{"format": "zipkin"}, Value: 1},
{Name: "reporter.spans.failures", Tags: map[string]string{"format": "zipkin"}, Value: 2},
}, expectedGauges: []metricstest.ExpectedMetric{
Expand Down

0 comments on commit 8e76a50

Please sign in to comment.