From d08277e30bfb4a3fe05d6597db57e74f6a1875ba Mon Sep 17 00:00:00 2001 From: Gustavo Paiva Date: Thu, 3 Oct 2019 04:09:06 -0300 Subject: [PATCH 1/3] change jaeger options to functional style --- exporter/trace/jaeger/example/main.go | 10 +- exporter/trace/jaeger/jaeger.go | 126 ++++++++---------------- exporter/trace/jaeger/uploader.go | 135 ++++++++++++++++++++++++++ 3 files changed, 178 insertions(+), 93 deletions(-) create mode 100644 exporter/trace/jaeger/uploader.go diff --git a/exporter/trace/jaeger/example/main.go b/exporter/trace/jaeger/example/main.go index bea839b66ab..43d5acd8e89 100644 --- a/exporter/trace/jaeger/example/main.go +++ b/exporter/trace/jaeger/example/main.go @@ -30,12 +30,12 @@ func main() { ctx := context.Background() // Create Jaeger Exporter - exporter, err := jaeger.NewExporter(jaeger.Options{ - CollectorEndpoint: "http://localhost:14268/api/traces", - Process: jaeger.Process{ + exporter, err := jaeger.NewExporter( + jaeger.WithCollectorEndpoint("http://localhost:14268/api/traces"), + jaeger.WithProcess(jaeger.Process{ ServiceName: "trace-demo", - }, - }) + }), + ) if err != nil { log.Fatal(err) } diff --git a/exporter/trace/jaeger/jaeger.go b/exporter/trace/jaeger/jaeger.go index b7621f68d90..ff038a17cd0 100644 --- a/exporter/trace/jaeger/jaeger.go +++ b/exporter/trace/jaeger/jaeger.go @@ -15,17 +15,9 @@ package jaeger import ( - "bytes" - "errors" - "fmt" - "io" - "io/ioutil" - "log" - "net/http" - - "github.com/apache/thrift/lib/go/thrift" "google.golang.org/api/support/bundler" "google.golang.org/grpc/codes" + "log" "go.opentelemetry.io/api/core" gen "go.opentelemetry.io/exporter/trace/jaeger/internal/gen-go/jaeger" @@ -34,30 +26,15 @@ import ( const defaultServiceName = "OpenTelemetry" -// Options are the options to be used when initializing a Jaeger exporter. -type Options struct { - // CollectorEndpoint is the full url to the Jaeger HTTP Thrift collector. - // For example, http://localhost:14268/api/traces - CollectorEndpoint string - - // AgentEndpoint instructs exporter to send spans to jaeger-agent at this address. - // For example, localhost:6831. - AgentEndpoint string +type Option func(*options) +// options are the options to be used when initializing a Jaeger exporter. +type options struct { // OnError is the hook to be called when there is // an error occurred when uploading the stats data. // If no custom hook is set, errors are logged. - // Optional. OnError func(err error) - // Username to be used if basic auth is required. - // Optional. - Username string - - // Password to be used if basic auth is required. - // Optional. - Password string - // Process contains the information about the exporting process. Process Process @@ -65,24 +42,42 @@ type Options struct { BufferMaxCount int } +// WithOnError sets the hook to be called when there is +// an error occurred when uploading the stats data. +// If no custom hook is set, errors are logged. +func WithOnError(onError func(err error)) func(o *options) { + return func(o *options) { + o.OnError = onError + } +} + +// WithProcess sets the process with the information about the exporting process. +func WithProcess(process Process) func(o *options) { + return func(o *options) { + o.Process = process + } +} + +//WithBufferMaxCount defines the total number of traces that can be buffered in memory +func WithBufferMaxCount(bufferMaxCount int) func(o *options) { + return func(o *options) { + o.BufferMaxCount = bufferMaxCount + } +} + // NewExporter returns a trace.Exporter implementation that exports // the collected spans to Jaeger. -func NewExporter(o Options) (*Exporter, error) { - if o.CollectorEndpoint == "" && o.AgentEndpoint == "" { - return nil, errors.New("missing endpoint for Jaeger exporter") +func NewExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, error) { + uploader, err := endpointOption() + if err != nil { + return nil, err } - var endpoint string - var client *agentClientUDP - var err error - if o.CollectorEndpoint != "" { - endpoint = o.CollectorEndpoint - } else { - client, err = newAgentClientUDP(o.AgentEndpoint, udpPacketMaxLength) - if err != nil { - return nil, err - } + o := options{} + for _, opt := range opts { + opt(&o) } + onError := func(err error) { if o.OnError != nil { o.OnError(err) @@ -99,11 +94,7 @@ func NewExporter(o Options) (*Exporter, error) { tags[i] = attributeToTag(tag.key, tag.value) } e := &Exporter{ - endpoint: endpoint, - agentEndpoint: o.AgentEndpoint, - client: client, - username: o.Username, - password: o.Password, + uploader: uploader, process: &gen.Process{ ServiceName: service, Tags: tags, @@ -152,6 +143,7 @@ type Exporter struct { client *agentClientUDP username, password string + uploader batchUploader } var _ trace.Exporter = (*Exporter)(nil) @@ -328,48 +320,6 @@ func (e *Exporter) upload(spans []*gen.Span) error { Spans: spans, Process: e.process, } - if e.endpoint != "" { - return e.uploadCollector(batch) - } - return e.uploadAgent(batch) -} - -func (e *Exporter) uploadAgent(batch *gen.Batch) error { - return e.client.EmitBatch(batch) -} - -func (e *Exporter) uploadCollector(batch *gen.Batch) error { - body, err := serialize(batch) - if err != nil { - return err - } - req, err := http.NewRequest("POST", e.endpoint, body) - if err != nil { - return err - } - if e.username != "" && e.password != "" { - req.SetBasicAuth(e.username, e.password) - } - req.Header.Set("Content-Type", "application/x-thrift") - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - - _, _ = io.Copy(ioutil.Discard, resp.Body) - resp.Body.Close() - - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode) - } - return nil -} - -func serialize(obj thrift.TStruct) (*bytes.Buffer, error) { - buf := thrift.NewTMemoryBuffer() - if err := obj.Write(thrift.NewTBinaryProtocolTransport(buf)); err != nil { - return nil, err - } - return buf.Buffer, nil + return e.uploader.upload(batch) } diff --git a/exporter/trace/jaeger/uploader.go b/exporter/trace/jaeger/uploader.go new file mode 100644 index 00000000000..56ec16d903a --- /dev/null +++ b/exporter/trace/jaeger/uploader.go @@ -0,0 +1,135 @@ +package jaeger + +import ( + "bytes" + "errors" + "fmt" + "github.com/apache/thrift/lib/go/thrift" + gen "go.opentelemetry.io/exporter/trace/jaeger/internal/gen-go/jaeger" + "io" + "io/ioutil" + "net/http" +) + +// batchUploader send a batch of spans to Jaeger +type batchUploader interface { + upload(batch *gen.Batch) error +} + +type EndpointOption func() (batchUploader, error) + +// WithAgentEndpoint instructs exporter to send spans to jaeger-agent at this address. +// For example, localhost:6831. +func WithAgentEndpoint(agentEndpoint string) func() (batchUploader, error) { + return func() (batchUploader, error) { + if agentEndpoint == "" { + return nil, errors.New("agentEndpoint must not be empty.") + } + + client, err := newAgentClientUDP(agentEndpoint, udpPacketMaxLength) + if err != nil { + return nil, err + } + + return &agentUploader{client: client}, nil + } +} + +// WithCollectorEndpoint defines the full url to the Jaeger HTTP Thrift collector. +// For example, http://localhost:14268/api/traces +func WithCollectorEndpoint(collectorEndpoint string, options ...CollectorEndpointOption) func() (batchUploader, error) { + return func() (batchUploader, error) { + if collectorEndpoint == "" { + return nil, errors.New("collectorEndpoint must not be empty.") + } + + o := &CollectorEndpointOptions{} + for _, opt := range options { + opt(o) + } + + return &collectorUploader{ + endpoint: collectorEndpoint, + username: o.username, + password: o.password, + }, nil + } +} + +type CollectorEndpointOption func(o *CollectorEndpointOptions) + +type CollectorEndpointOptions struct { + // username to be used if basic auth is required. + username string + + // password to be used if basic auth is required. + password string +} + +// WithUsername sets the username to be used if basic auth is required. +func WithUsername(username string) func(o *CollectorEndpointOptions) { + return func(o *CollectorEndpointOptions) { + o.username = username + } +} + +// WithPassword sets the password to be used if basic auth is required. +func WithPassword(password string) func(o *CollectorEndpointOptions) { + return func(o *CollectorEndpointOptions) { + o.password = password + } +} + +// agentUploader implements batchUploader interface sending batches to +// Jaeger through the UDP agent. +type agentUploader struct { + client *agentClientUDP +} + +func (a *agentUploader) upload(batch *gen.Batch) error { + return a.client.EmitBatch(batch) +} + +// collectorUploader implements batchUploader interface sending batches to +// Jaeger through the collector http endpoint. +type collectorUploader struct { + endpoint string + username string + password string +} + +func (c *collectorUploader) upload(batch *gen.Batch) error { + body, err := serialize(batch) + if err != nil { + return err + } + req, err := http.NewRequest("POST", c.endpoint, body) + if err != nil { + return err + } + if c.username != "" && c.password != "" { + req.SetBasicAuth(c.username, c.password) + } + req.Header.Set("Content-Type", "application/x-thrift") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + _, _ = io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode) + } + return nil +} + +func serialize(obj thrift.TStruct) (*bytes.Buffer, error) { + buf := thrift.NewTMemoryBuffer() + if err := obj.Write(thrift.NewTBinaryProtocolTransport(buf)); err != nil { + return nil, err + } + return buf.Buffer, nil +} From bb9f3a063f479398ad04c189827df7a38848ce25 Mon Sep 17 00:00:00 2001 From: Gustavo Paiva Date: Thu, 3 Oct 2019 04:25:17 -0300 Subject: [PATCH 2/3] fix lints --- exporter/trace/jaeger/jaeger.go | 14 +++++--------- exporter/trace/jaeger/uploader.go | 6 ++++-- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/exporter/trace/jaeger/jaeger.go b/exporter/trace/jaeger/jaeger.go index ff038a17cd0..2eaae375603 100644 --- a/exporter/trace/jaeger/jaeger.go +++ b/exporter/trace/jaeger/jaeger.go @@ -15,9 +15,10 @@ package jaeger import ( + "log" + "google.golang.org/api/support/bundler" "google.golang.org/grpc/codes" - "log" "go.opentelemetry.io/api/core" gen "go.opentelemetry.io/exporter/trace/jaeger/internal/gen-go/jaeger" @@ -136,14 +137,9 @@ type Tag struct { // Exporter is an implementation of trace.Exporter that uploads spans to Jaeger. type Exporter struct { - endpoint string - agentEndpoint string - process *gen.Process - bundler *bundler.Bundler - client *agentClientUDP - - username, password string - uploader batchUploader + process *gen.Process + bundler *bundler.Bundler + uploader batchUploader } var _ trace.Exporter = (*Exporter)(nil) diff --git a/exporter/trace/jaeger/uploader.go b/exporter/trace/jaeger/uploader.go index 56ec16d903a..8ab08bbdcf9 100644 --- a/exporter/trace/jaeger/uploader.go +++ b/exporter/trace/jaeger/uploader.go @@ -4,11 +4,13 @@ import ( "bytes" "errors" "fmt" - "github.com/apache/thrift/lib/go/thrift" - gen "go.opentelemetry.io/exporter/trace/jaeger/internal/gen-go/jaeger" "io" "io/ioutil" "net/http" + + "github.com/apache/thrift/lib/go/thrift" + + gen "go.opentelemetry.io/exporter/trace/jaeger/internal/gen-go/jaeger" ) // batchUploader send a batch of spans to Jaeger From 5c585b9f33885238e93bc6317b34d8affda247bd Mon Sep 17 00:00:00 2001 From: Gustavo Paiva Date: Thu, 3 Oct 2019 17:59:55 -0300 Subject: [PATCH 3/3] add interface validaiton --- exporter/trace/jaeger/jaeger.go | 4 ++-- exporter/trace/jaeger/uploader.go | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/exporter/trace/jaeger/jaeger.go b/exporter/trace/jaeger/jaeger.go index 2eaae375603..14a1f882c49 100644 --- a/exporter/trace/jaeger/jaeger.go +++ b/exporter/trace/jaeger/jaeger.go @@ -32,7 +32,7 @@ type Option func(*options) // options are the options to be used when initializing a Jaeger exporter. type options struct { // OnError is the hook to be called when there is - // an error occurred when uploading the stats data. + // an error occurred when uploading the span data. // If no custom hook is set, errors are logged. OnError func(err error) @@ -44,7 +44,7 @@ type options struct { } // WithOnError sets the hook to be called when there is -// an error occurred when uploading the stats data. +// an error occurred when uploading the span data. // If no custom hook is set, errors are logged. func WithOnError(onError func(err error)) func(o *options) { return func(o *options) { diff --git a/exporter/trace/jaeger/uploader.go b/exporter/trace/jaeger/uploader.go index 8ab08bbdcf9..0f6f26d2ac4 100644 --- a/exporter/trace/jaeger/uploader.go +++ b/exporter/trace/jaeger/uploader.go @@ -88,6 +88,8 @@ type agentUploader struct { client *agentClientUDP } +var _ batchUploader = (*agentUploader)(nil) + func (a *agentUploader) upload(batch *gen.Batch) error { return a.client.EmitBatch(batch) } @@ -100,6 +102,8 @@ type collectorUploader struct { password string } +var _ batchUploader = (*collectorUploader)(nil) + func (c *collectorUploader) upload(batch *gen.Batch) error { body, err := serialize(batch) if err != nil {