Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove locking from Jaeger exporter shutdown/export #1807

Merged
merged 5 commits into from
Apr 17, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions exporters/trace/jaeger/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"sync"

"google.golang.org/api/support/bundler"

Expand Down Expand Up @@ -115,8 +114,10 @@ func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, e
return nil, fmt.Errorf("failed to get service name from default resource")
}

stopCh := make(chan struct{})
e := &Exporter{
uploader: uploader,
stopCh: stopCh,
defaultServiceName: defaultServiceName,
}
bundler := bundler.NewBundler((*sdktrace.SpanSnapshot)(nil), func(bundle interface{}) {
Expand Down Expand Up @@ -180,8 +181,7 @@ type Exporter struct {
bundler *bundler.Bundler
uploader batchUploader

stoppedMu sync.RWMutex
stopped bool
stopCh chan struct{}

defaultServiceName string
}
Expand All @@ -190,13 +190,27 @@ var _ sdktrace.SpanExporter = (*Exporter)(nil)

// ExportSpans exports SpanSnapshots to Jaeger.
func (e *Exporter) ExportSpans(ctx context.Context, ss []*sdktrace.SpanSnapshot) error {
e.stoppedMu.RLock()
stopped := e.stopped
e.stoppedMu.RUnlock()
if stopped {
// Return fast if context is already canceled or Exporter shutdown.
select {
case <-ctx.Done():
return ctx.Err()
case <-e.stopCh:
return nil
default:
}

// Cancel export if Exporter is shutdown.
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
go func(ctx context.Context, cancel context.CancelFunc) {
select {
case <-ctx.Done():
case <-e.stopCh:
cancel()
}
}(ctx, cancel)

for _, span := range ss {
// TODO(jbd): Handle oversized bundlers.
err := e.bundler.AddWait(ctx, span, 1)
Expand All @@ -220,9 +234,8 @@ var flush = func(e *Exporter) {

// Shutdown stops the exporter flushing any pending exports.
func (e *Exporter) Shutdown(ctx context.Context) error {
e.stoppedMu.Lock()
e.stopped = true
e.stoppedMu.Unlock()
// Stop any active and subsequent exports.
close(e.stopCh)

done := make(chan struct{}, 1)
// Shadow so if the goroutine is leaked in testing it doesn't cause a race
Expand Down Expand Up @@ -408,6 +421,12 @@ func getBoolTag(k string, b bool) *gen.Tag {
//
// This is useful if your program is ending and you do not want to lose recent spans.
func (e *Exporter) Flush() {
// Return fast if Exporter shutdown.
select {
case <-e.stopCh:
return
default:
}
flush(e)
}

Expand Down