Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Annanay <annanay.a@media.net>
  • Loading branch information
Annanay committed Mar 4, 2019
1 parent 1943524 commit 15d2b20
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 60 deletions.
34 changes: 30 additions & 4 deletions cmd/agent/app/reporter/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@ package reporter
import (
"flag"
"fmt"
"os"
"strings"

"github.com/spf13/viper"
)

const (
// Whether to use grpc or tchannel reporter.
reporterType = "reporter.type"
// Agent tags
agentTags = "jaeger.tags"
// TCHANNEL is name of tchannel reporter.
TCHANNEL Type = "tchannel"
// GRPC is name of gRPC reporter.
GRPC Type = "grpc"
// Agent tags
agentTags = "jaeger.tags"
)

// Type defines type of reporter.
Expand All @@ -37,7 +40,7 @@ type Type string
// Options holds generic reporter configuration.
type Options struct {
ReporterType Type
AgentTags Type
AgentTags map[Type]Type
}

// AddFlags adds flags for Options.
Expand All @@ -49,6 +52,29 @@ func AddFlags(flags *flag.FlagSet) {
// InitFromViper initializes Options with properties retrieved from Viper.
func (b *Options) InitFromViper(v *viper.Viper) *Options {
b.ReporterType = Type(v.GetString(reporterType))
b.AgentTags = Type(v.GetString(agentTags))
b.AgentTags = parseAgentTags(Type(v.GetString(agentTags)))
return b
}

// Parsing logic borrowed from jaegertracing/jaeger-client-go
func parseAgentTags(agentTags Type) map[Type]Type {
tagPairs := strings.Split(string(agentTags), ",")
var tags map[Type]Type
for _, p := range tagPairs {
kv := strings.SplitN(p, "=", 2)
k, v := Type(strings.TrimSpace(kv[0])), Type(strings.TrimSpace(kv[1]))

if strings.HasPrefix(string(v), "${") && strings.HasSuffix(string(v), "}") {
ed := strings.SplitN(string(v[2:len(v)-1]), ":", 2)
e, d := ed[0], ed[1]
v = Type(os.Getenv(e))
if v == "" && d != "" {
v = Type(d)
}
}

tags[k] = v
}

return tags
}
4 changes: 3 additions & 1 deletion cmd/agent/app/reporter/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ func TestBindFlags(t *testing.T) {

err := command.ParseFlags([]string{
"--reporter.type=grpc",
"--jaeger.tag key=value",
})
require.NoError(t, err)

b := &Options{}
b.InitFromViper(v)
assert.Equal(t, Type("grpc"), b.ReporterType)
}
assert.Equal(t, parseAgentTags(Type("key=value")), b.AgentTags)
}
2 changes: 1 addition & 1 deletion cmd/agent/app/reporter/grpc/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ProxyBuilder struct {
var systemCertPool = x509.SystemCertPool // to allow overriding in unit test

// NewCollectorProxy creates ProxyBuilder
func NewCollectorProxy(o *Options, agentTags string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
func NewCollectorProxy(o *Options, agentTags map[string]string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
if len(o.CollectorHostPort) == 0 {
return nil, errors.New("could not create collector proxy, address is missing")
}
Expand Down
27 changes: 7 additions & 20 deletions cmd/agent/app/reporter/grpc/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@ type Reporter struct {
}

// NewReporter creates gRPC reporter.
func NewReporter(conn *grpc.ClientConn, agentTags string, logger *zap.Logger) *Reporter {
func NewReporter(conn *grpc.ClientConn, agentTags map[string]string, logger *zap.Logger) *Reporter {
return &Reporter{
collector: api_v2.NewCollectorServiceClient(conn),
logger: logger,
sanitizer: zipkin2.NewChainedSanitizer(zipkin2.StandardSanitizers...),
agentTags: parseAgentTags(agentTags),
agentTags: makeModelKeyValue(agentTags),
}
}

// EmitBatch implements EmitBatch() of Reporter
func (r *Reporter) EmitBatch(b *thrift.Batch) error {
return r.send(addAgentTags(jConverter.ToDomain(b.Spans, nil), r.agentTags), jConverter.ToDomainProcess(b.Process))
return r.send(jConverter.ToDomain(b.Spans, nil), jConverter.ToDomainProcess(b.Process))
}

// EmitZipkinBatch implements EmitZipkinBatch() of Reporter
Expand All @@ -61,10 +61,11 @@ func (r *Reporter) EmitZipkinBatch(zSpans []*zipkincore.Span) error {
if err != nil {
return err
}
return r.send(addAgentTags(trace.Spans, r.agentTags), nil)
return r.send(trace.Spans, nil)
}

func (r *Reporter) send(spans []*model.Span, process *model.Process) error {
spans = addAgentTags(spans, r.agentTags)
batch := model.Batch{Spans: spans, Process: process}
req := &api_v2.PostSpansRequest{Batch: batch}
_, err := r.collector.PostSpans(context.Background(), req)
Expand All @@ -84,23 +85,9 @@ func addAgentTags(spans []*model.Span, agentTags []model.KeyValue) []*model.Span
return spans
}

// Parsing logic borrowed from jaegertracing/jaeger-client-go
func parseAgentTags(agentTags string) []model.KeyValue {
tagPairs := strings.Split(agentTags, ",")
func makeModelKeyValue(agentTags map[string]string) []model.KeyValue {
tags := make([]model.KeyValue, 0)
for _, p := range tagPairs {
kv := strings.SplitN(p, "=", 2)
k, v := strings.TrimSpace(kv[0]), strings.TrimSpace(kv[1])

if strings.HasPrefix(v, "${") && strings.HasSuffix(v, "}") {
ed := strings.SplitN(v[2:len(v)-1], ":", 2)
e, d := ed[0], ed[1]
v = os.Getenv(e)
if v == "" && d != "" {
v = d
}
}

for k, v := range agentTags {
tag := model.KeyValue{
Key: k,
VStr: v,
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/reporter/tchannel/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (b *Builder) enableDiscovery(channel *tchannel.Channel, logger *zap.Logger)
}

// CreateReporter creates the TChannel-based Reporter
func (b *Builder) CreateReporter(agentTags string, logger *zap.Logger) (*Reporter, error) {
func (b *Builder) CreateReporter(agentTags map[string]string, logger *zap.Logger) (*Reporter, error) {
if b.channel == nil {
// ignore errors since it only happens on empty service name
b.channel, _ = tchannel.NewChannel(agentServiceName, nil)
Expand Down
16 changes: 8 additions & 8 deletions cmd/agent/app/reporter/tchannel/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestBuilderFromConfig(t *testing.T) {
t,
[]string{"127.0.0.1:14267", "127.0.0.1:14268", "127.0.0.1:14269"},
cfg.CollectorHostPorts)
r, err := cfg.CreateReporter("", zap.NewNop())
r, err := cfg.CreateReporter(nil, zap.NewNop())
require.NoError(t, err)
assert.NotNil(t, r)
assert.Equal(t, "some-collector-service", r.CollectorServiceName())
Expand All @@ -60,34 +60,34 @@ func TestBuilderWithDiscovery(t *testing.T) {
cfg := &Builder{}
discoverer := discovery.FixedDiscoverer([]string{"1.1.1.1:80"})
cfg.WithDiscoverer(discoverer)
_, err := cfg.CreateReporter("", zap.NewNop())
_, err := cfg.CreateReporter(nil, zap.NewNop())
assert.EqualError(t, err, "cannot enable service discovery: both discovery.Discoverer and discovery.Notifier must be specified")

cfg = &Builder{}
notifier := &discovery.Dispatcher{}
cfg.WithDiscoverer(discoverer).WithDiscoveryNotifier(notifier)
agent, err := cfg.CreateReporter("", zap.NewNop())
agent, err := cfg.CreateReporter(nil, zap.NewNop())
assert.NoError(t, err)
assert.NotNil(t, agent)
}

func TestBuilderWithDiscoveryError(t *testing.T) {
tbuilder := NewBuilder().WithDiscoverer(fakeDiscoverer{})
rep, err := tbuilder.CreateReporter("", zap.NewNop())
rep, err := tbuilder.CreateReporter(nil, zap.NewNop())
assert.EqualError(t, err, "cannot enable service discovery: both discovery.Discoverer and discovery.Notifier must be specified")
assert.Nil(t, rep)
}

func TestBuilderWithCollectorServiceName(t *testing.T) {
cfg := &Builder{}
cfg.WithCollectorServiceName("svc")
agent, err := cfg.CreateReporter("", zap.NewNop())
agent, err := cfg.CreateReporter(nil, zap.NewNop())
assert.NoError(t, err)
assert.NotNil(t, agent)
assert.Equal(t, cfg.CollectorServiceName, "svc")

cfg = NewBuilder()
agent, err = cfg.CreateReporter("", zap.NewNop())
agent, err = cfg.CreateReporter(nil, zap.NewNop())
assert.NoError(t, err)
assert.NotNil(t, agent)
assert.Equal(t, cfg.CollectorServiceName, "jaeger-collector")
Expand All @@ -97,7 +97,7 @@ func TestBuilderWithChannel(t *testing.T) {
cfg := &Builder{}
channel, _ := tchannel.NewChannel(agentServiceName, nil)
cfg.WithChannel(channel)
rep, err := cfg.CreateReporter("", zap.NewNop())
rep, err := cfg.CreateReporter(nil, zap.NewNop())
assert.NoError(t, err)
assert.NotNil(t, rep.Channel())
assert.Equal(t, defaultCollectorServiceName, rep.CollectorServiceName())
Expand All @@ -108,7 +108,7 @@ func TestBuilderWithCollectors(t *testing.T) {
cfg := &Builder{
CollectorHostPorts: hostPorts,
}
agent, err := cfg.CreateReporter("", zap.NewNop())
agent, err := cfg.CreateReporter(nil, zap.NewNop())
assert.NoError(t, err)
assert.NotNil(t, agent)

Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/reporter/tchannel/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type ProxyBuilder struct {
}

// NewCollectorProxy creates ProxyBuilder
func NewCollectorProxy(builder *Builder, agentTags string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
func NewCollectorProxy(builder *Builder, agentTags map[string]string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
r, err := builder.CreateReporter(agentTags, logger)
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions cmd/agent/app/reporter/tchannel/collector_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

func TestErrorReporterBuilder(t *testing.T) {
tbuilder := NewBuilder().WithDiscoverer(fakeDiscoverer{})
b, err := NewCollectorProxy(tbuilder, "", metrics.NullFactory, zap.NewNop())
b, err := NewCollectorProxy(tbuilder, nil, metrics.NullFactory, zap.NewNop())
require.Error(t, err)
assert.Nil(t, b)
}
Expand All @@ -38,10 +38,10 @@ func TestCreate(t *testing.T) {
cfg := &Builder{}
mFactory := metrics.NullFactory
logger := zap.NewNop()
b, err := NewCollectorProxy(cfg, "", mFactory, logger)
b, err := NewCollectorProxy(cfg, nil, mFactory, logger)
require.NoError(t, err)
assert.NotNil(t, b)
r, _ := cfg.CreateReporter("", logger)
r, _ := cfg.CreateReporter(nil, logger)
assert.Equal(t, reporter.WrapWithMetrics(r, mFactory), b.GetReporter())
m := tchannel.NewConfigManager(r.CollectorServiceName(), r.Channel())
assert.Equal(t, configmanager.WrapWithMetrics(m, mFactory), b.GetManager())
Expand Down
24 changes: 4 additions & 20 deletions cmd/agent/app/reporter/tchannel/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package tchannel

import (
"os"
"strings"
"time"

"github.com/uber/tchannel-go"
Expand Down Expand Up @@ -46,13 +44,13 @@ func New(
channel *tchannel.Channel,
reportTimeout time.Duration,
peerListMgr *peerlistmgr.PeerListManager,
agentTagString string,
agentTagString map[string]string,
zlogger *zap.Logger,
) *Reporter {
thriftClient := thrift.NewClient(channel, collectorServiceName, nil)
zClient := zipkincore.NewTChanZipkinCollectorClient(thriftClient)
jClient := jaeger.NewTChanCollectorClient(thriftClient)
agentTags := parseAgentTags(agentTagString)
agentTags := makeJaegerTag(agentTagString)
return &Reporter{
channel: channel,
zClient: zClient,
Expand Down Expand Up @@ -137,23 +135,9 @@ func addAgentTags(spans []*jaeger.Span, agentTags []jaeger.Tag) []*jaeger.Span {
return spans
}

// Parsing logic borrowed from jaegertracing/jaeger-client-go
func parseAgentTags(agentTags string) []jaeger.Tag {
tagPairs := strings.Split(agentTags, ",")
func makeJaegerTag(agentTags map[string]string) []jaeger.Tag {
tags := make([]jaeger.Tag, 0)
for _, p := range tagPairs {
kv := strings.SplitN(p, "=", 2)
k, v := strings.TrimSpace(kv[0]), strings.TrimSpace(kv[1])

if strings.HasPrefix(v, "${") && strings.HasSuffix(v, "}") {
ed := strings.SplitN(v[2:len(v)-1], ":", 2)
e, d := ed[0], ed[1]
v = os.Getenv(e)
if v == "" && d != "" {
v = d
}
}

for k, v := range agentTags {
tag := jaeger.Tag{
Key: k,
VStr: &v,
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/reporter/tchannel/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func initRequirements(t *testing.T) (*metricstest.Factory, *testutils.MockTCollector, *Reporter) {
metricsFactory, collector := testutils.InitMockCollector(t)
reporter := New("jaeger-collector", collector.Channel, time.Second, nil, "", zap.NewNop())
reporter := New("jaeger-collector", collector.Channel, time.Second, nil, nil, zap.NewNop())
return metricsFactory, collector, reporter
}

Expand Down

0 comments on commit 15d2b20

Please sign in to comment.