Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change jaeger options to functional style #161

Merged
merged 4 commits into from
Oct 4, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions exporter/trace/jaeger/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ func main() {
ctx := context.Background()

// Create Jaeger Exporter
exporter, err := jaeger.NewExporter(jaeger.Options{
CollectorEndpoint: "http://localhost:14268/api/traces",
Process: jaeger.Process{
exporter, err := jaeger.NewExporter(
jaeger.WithCollectorEndpoint("http://localhost:14268/api/traces"),
jaeger.WithProcess(jaeger.Process{
ServiceName: "trace-demo",
},
})
}),
)
if err != nil {
log.Fatal(err)
}
Expand Down
134 changes: 40 additions & 94 deletions exporter/trace/jaeger/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,8 @@
package jaeger

import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"

"github.com/apache/thrift/lib/go/thrift"
"google.golang.org/api/support/bundler"
"google.golang.org/grpc/codes"

Expand All @@ -34,55 +27,58 @@ import (

const defaultServiceName = "OpenTelemetry"

// Options are the options to be used when initializing a Jaeger exporter.
type Options struct {
// CollectorEndpoint is the full url to the Jaeger HTTP Thrift collector.
// For example, http://localhost:14268/api/traces
CollectorEndpoint string

// AgentEndpoint instructs exporter to send spans to jaeger-agent at this address.
// For example, localhost:6831.
AgentEndpoint string
type Option func(*options)

// options are the options to be used when initializing a Jaeger exporter.
type options struct {
// OnError is the hook to be called when there is
// an error occurred when uploading the stats data.
// an error occurred when uploading the span data.
// If no custom hook is set, errors are logged.
// Optional.
OnError func(err error)

// Username to be used if basic auth is required.
// Optional.
Username string

// Password to be used if basic auth is required.
// Optional.
Password string

// Process contains the information about the exporting process.
Process Process

//BufferMaxCount defines the total number of traces that can be buffered in memory
BufferMaxCount int
}

// WithOnError sets the hook to be called when there is
// an error occurred when uploading the span data.
// If no custom hook is set, errors are logged.
func WithOnError(onError func(err error)) func(o *options) {
return func(o *options) {
o.OnError = onError
}
}

// WithProcess sets the process with the information about the exporting process.
func WithProcess(process Process) func(o *options) {
return func(o *options) {
o.Process = process
}
}

//WithBufferMaxCount defines the total number of traces that can be buffered in memory
func WithBufferMaxCount(bufferMaxCount int) func(o *options) {
return func(o *options) {
o.BufferMaxCount = bufferMaxCount
}
}

// NewExporter returns a trace.Exporter implementation that exports
// the collected spans to Jaeger.
func NewExporter(o Options) (*Exporter, error) {
if o.CollectorEndpoint == "" && o.AgentEndpoint == "" {
return nil, errors.New("missing endpoint for Jaeger exporter")
func NewExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, error) {
uploader, err := endpointOption()
if err != nil {
return nil, err
}

var endpoint string
var client *agentClientUDP
var err error
if o.CollectorEndpoint != "" {
endpoint = o.CollectorEndpoint
} else {
client, err = newAgentClientUDP(o.AgentEndpoint, udpPacketMaxLength)
if err != nil {
return nil, err
}
o := options{}
for _, opt := range opts {
opt(&o)
}

onError := func(err error) {
if o.OnError != nil {
o.OnError(err)
Expand All @@ -99,11 +95,7 @@ func NewExporter(o Options) (*Exporter, error) {
tags[i] = attributeToTag(tag.key, tag.value)
}
e := &Exporter{
endpoint: endpoint,
agentEndpoint: o.AgentEndpoint,
client: client,
username: o.Username,
password: o.Password,
uploader: uploader,
process: &gen.Process{
ServiceName: service,
Tags: tags,
Expand Down Expand Up @@ -145,13 +137,9 @@ type Tag struct {

// Exporter is an implementation of trace.Exporter that uploads spans to Jaeger.
type Exporter struct {
endpoint string
agentEndpoint string
process *gen.Process
bundler *bundler.Bundler
client *agentClientUDP

username, password string
process *gen.Process
bundler *bundler.Bundler
uploader batchUploader
}

var _ trace.Exporter = (*Exporter)(nil)
Expand Down Expand Up @@ -328,48 +316,6 @@ func (e *Exporter) upload(spans []*gen.Span) error {
Spans: spans,
Process: e.process,
}
if e.endpoint != "" {
return e.uploadCollector(batch)
}
return e.uploadAgent(batch)
}

func (e *Exporter) uploadAgent(batch *gen.Batch) error {
return e.client.EmitBatch(batch)
}

func (e *Exporter) uploadCollector(batch *gen.Batch) error {
body, err := serialize(batch)
if err != nil {
return err
}
req, err := http.NewRequest("POST", e.endpoint, body)
if err != nil {
return err
}
if e.username != "" && e.password != "" {
req.SetBasicAuth(e.username, e.password)
}
req.Header.Set("Content-Type", "application/x-thrift")

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}

_, _ = io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode)
}
return nil
}

func serialize(obj thrift.TStruct) (*bytes.Buffer, error) {
buf := thrift.NewTMemoryBuffer()
if err := obj.Write(thrift.NewTBinaryProtocolTransport(buf)); err != nil {
return nil, err
}
return buf.Buffer, nil
return e.uploader.upload(batch)
}
141 changes: 141 additions & 0 deletions exporter/trace/jaeger/uploader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package jaeger

import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"

"github.com/apache/thrift/lib/go/thrift"

gen "go.opentelemetry.io/exporter/trace/jaeger/internal/gen-go/jaeger"
)

// batchUploader send a batch of spans to Jaeger
type batchUploader interface {
upload(batch *gen.Batch) error
}

type EndpointOption func() (batchUploader, error)

// WithAgentEndpoint instructs exporter to send spans to jaeger-agent at this address.
// For example, localhost:6831.
func WithAgentEndpoint(agentEndpoint string) func() (batchUploader, error) {
return func() (batchUploader, error) {
if agentEndpoint == "" {
return nil, errors.New("agentEndpoint must not be empty.")
}

client, err := newAgentClientUDP(agentEndpoint, udpPacketMaxLength)
if err != nil {
return nil, err
}

return &agentUploader{client: client}, nil
}
}

// WithCollectorEndpoint defines the full url to the Jaeger HTTP Thrift collector.
// For example, http://localhost:14268/api/traces
func WithCollectorEndpoint(collectorEndpoint string, options ...CollectorEndpointOption) func() (batchUploader, error) {
return func() (batchUploader, error) {
if collectorEndpoint == "" {
return nil, errors.New("collectorEndpoint must not be empty.")
}

o := &CollectorEndpointOptions{}
for _, opt := range options {
opt(o)
}

return &collectorUploader{
endpoint: collectorEndpoint,
username: o.username,
password: o.password,
}, nil
}
}

type CollectorEndpointOption func(o *CollectorEndpointOptions)

type CollectorEndpointOptions struct {
// username to be used if basic auth is required.
username string

// password to be used if basic auth is required.
password string
}

// WithUsername sets the username to be used if basic auth is required.
func WithUsername(username string) func(o *CollectorEndpointOptions) {
return func(o *CollectorEndpointOptions) {
o.username = username
}
}

// WithPassword sets the password to be used if basic auth is required.
func WithPassword(password string) func(o *CollectorEndpointOptions) {
return func(o *CollectorEndpointOptions) {
o.password = password
}
}

// agentUploader implements batchUploader interface sending batches to
// Jaeger through the UDP agent.
type agentUploader struct {
client *agentClientUDP
}
paivagustavo marked this conversation as resolved.
Show resolved Hide resolved

var _ batchUploader = (*agentUploader)(nil)

func (a *agentUploader) upload(batch *gen.Batch) error {
return a.client.EmitBatch(batch)
}

// collectorUploader implements batchUploader interface sending batches to
// Jaeger through the collector http endpoint.
type collectorUploader struct {
endpoint string
username string
password string
}
paivagustavo marked this conversation as resolved.
Show resolved Hide resolved

var _ batchUploader = (*collectorUploader)(nil)

func (c *collectorUploader) upload(batch *gen.Batch) error {
body, err := serialize(batch)
if err != nil {
return err
}
req, err := http.NewRequest("POST", c.endpoint, body)
if err != nil {
return err
}
if c.username != "" && c.password != "" {
req.SetBasicAuth(c.username, c.password)
}
req.Header.Set("Content-Type", "application/x-thrift")

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}

_, _ = io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode)
}
return nil
}

func serialize(obj thrift.TStruct) (*bytes.Buffer, error) {
buf := thrift.NewTMemoryBuffer()
if err := obj.Write(thrift.NewTBinaryProtocolTransport(buf)); err != nil {
return nil, err
}
return buf.Buffer, nil
}