Skip to content

Commit

Permalink
Export package (#162)
Browse files Browse the repository at this point in the history
* setup sdk exporter package

* use sdk exporter package in sdk trace

* use sdk exporter package in all exporters

* empty the exporters list before testing Load

* move SpanData to the exporter package

* use the SpanProcessor registration, don't register exporters

* rename exporter structs to avoid stutter

* rename Syncer and Batcher to SpanSyncer and SpanBatcher

So it's explicit they are for spans, and we reduce the risk of name
conflict

* remove not moot todo

* rename sdk exporter to export

* only execute the SpanData if it is sampled
  • Loading branch information
dmathieu authored and rghetia committed Oct 8, 2019
1 parent 93c6679 commit c2d5c66
Show file tree
Hide file tree
Showing 14 changed files with 174 additions and 193 deletions.
13 changes: 7 additions & 6 deletions exporter/trace/jaeger/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@
package jaeger

import (
"context"
"log"

"google.golang.org/api/support/bundler"
"google.golang.org/grpc/codes"

"go.opentelemetry.io/api/core"
gen "go.opentelemetry.io/exporter/trace/jaeger/internal/gen-go/jaeger"
"go.opentelemetry.io/sdk/trace"
"go.opentelemetry.io/sdk/export"
)

const defaultServiceName = "OpenTelemetry"

type Option func(*options)

// options are the options to be used when initializing a Jaeger exporter.
// options are the options to be used when initializing a Jaeger export.
type options struct {
// OnError is the hook to be called when there is
// an error occurred when uploading the span data.
Expand Down Expand Up @@ -142,15 +143,15 @@ type Exporter struct {
uploader batchUploader
}

var _ trace.Exporter = (*Exporter)(nil)
var _ export.SpanSyncer = (*Exporter)(nil)

// ExportSpan exports a SpanData to Jaeger.
func (e *Exporter) ExportSpan(data *trace.SpanData) {
_ = e.bundler.Add(spanDataToThrift(data), 1)
func (e *Exporter) ExportSpan(ctx context.Context, d *export.SpanData) {
_ = e.bundler.Add(spanDataToThrift(d), 1)
// TODO(jbd): Handle oversized bundlers.
}

func spanDataToThrift(data *trace.SpanData) *gen.Span {
func spanDataToThrift(data *export.SpanData) *gen.Span {
tags := make([]*gen.Tag, 0, len(data.Attributes))
for _, kv := range data.Attributes {
tag := coreAttributeToTag(kv)
Expand Down
6 changes: 3 additions & 3 deletions exporter/trace/jaeger/jaeger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

"go.opentelemetry.io/api/core"
gen "go.opentelemetry.io/exporter/trace/jaeger/internal/gen-go/jaeger"
"go.opentelemetry.io/sdk/trace"
"go.opentelemetry.io/sdk/export"
)

// TODO(rghetia): Test export.
Expand All @@ -47,12 +47,12 @@ func Test_spanDataToThrift(t *testing.T) {

tests := []struct {
name string
data *trace.SpanData
data *export.SpanData
want *gen.Span
}{
{
name: "no parent",
data: &trace.SpanData{
data: &export.SpanData{
SpanContext: core.SpanContext{
TraceID: traceID,
SpanID: spanID,
Expand Down
7 changes: 4 additions & 3 deletions exporter/trace/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
package stdout

import (
"context"
"encoding/json"
"io"
"os"

"go.opentelemetry.io/sdk/trace"
"go.opentelemetry.io/sdk/export"
)

// Options are the options to be used when initializing a stdout exporter.
// Options are the options to be used when initializing a stdout export.
type Options struct {
// PrettyPrint will pretty the json representation of the span,
// making it print "pretty". Default is false.
Expand All @@ -43,7 +44,7 @@ func NewExporter(o Options) (*Exporter, error) {
}

// ExportSpan writes a SpanData in json format to stdout.
func (e *Exporter) ExportSpan(data *trace.SpanData) {
func (e *Exporter) ExportSpan(ctx context.Context, data *export.SpanData) {
var jsonSpan []byte
var err error
if e.pretty {
Expand Down
11 changes: 6 additions & 5 deletions exporter/trace/stdout/stdout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,26 @@ package stdout

import (
"bytes"
"context"
"encoding/json"
"testing"
"time"

"google.golang.org/grpc/codes"

"go.opentelemetry.io/api/core"
"go.opentelemetry.io/sdk/trace"
"go.opentelemetry.io/sdk/export"
)

func TestExporter_ExportSpan(t *testing.T) {
exporter, err := NewExporter(Options{})
ex, err := NewExporter(Options{})
if err != nil {
t.Errorf("Error constructing stdout exporter %s", err)
}

// override output writer for testing
var b bytes.Buffer
exporter.outputWriter = &b
ex.outputWriter = &b

// setup test span
now := time.Now()
Expand All @@ -43,7 +44,7 @@ func TestExporter_ExportSpan(t *testing.T) {
keyValue := "value"
doubleValue := float64(123.456)

testSpan := &trace.SpanData{
testSpan := &export.SpanData{
SpanContext: core.SpanContext{
TraceID: traceID,
SpanID: spanID,
Expand All @@ -63,7 +64,7 @@ func TestExporter_ExportSpan(t *testing.T) {
},
Status: codes.Unknown,
}
exporter.ExportSpan(testSpan)
ex.ExportSpan(context.Background(), testSpan)

expectedSerializedNow, _ := json.Marshal(now)

Expand Down
40 changes: 40 additions & 0 deletions sdk/export/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package export

import (
"context"
)

// SpanSyncer is a type for functions that receive a single sampled trace span.
//
// The ExportSpan method is called synchronously. Therefore, it should not take
// forever to process the span.
//
// The SpanData should not be modified.
type SpanSyncer interface {
ExportSpan(context.Context, *SpanData)
}

// SpanBatcher is a type for functions that receive batched of sampled trace
// spans.
//
// The ExportSpans method is called asynchronously. However its should not take
// forever to process the spans.
//
// The SpanData should not be modified.
type SpanBatcher interface {
ExportSpans(context.Context, []*SpanData)
}
66 changes: 1 addition & 65 deletions sdk/trace/export.go → sdk/export/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package trace
package export

import (
"sync"
"sync/atomic"
"time"

"google.golang.org/grpc/codes"
Expand All @@ -25,68 +23,6 @@ import (
apitrace "go.opentelemetry.io/api/trace"
)

// BatchExporter is a type for functions that receive sampled trace spans.
//
// The ExportSpans method is called asynchronously. However BatchExporter should
// not take forever to process the spans.
//
// The SpanData should not be modified.
type BatchExporter interface {
ExportSpans(sds []*SpanData)
}

// Exporter is a type for functions that receive sampled trace spans.
//
// The ExportSpan method should be safe for concurrent use and should return
// quickly; if an Exporter takes a significant amount of time to process a
// SpanData, that work should be done on another goroutine.
//
// The SpanData should not be modified, but a pointer to it can be kept.
type Exporter interface {
ExportSpan(s *SpanData)
}

type exportersMap map[Exporter]struct{}

var (
exporterMu sync.Mutex
exporters atomic.Value
)

// RegisterExporter adds to the list of Exporters that will receive sampled
// trace spans.
//
// Binaries can register exporters, libraries shouldn't register exporters.
// TODO(rghetia) : Remove it.
func RegisterExporter(e Exporter) {
exporterMu.Lock()
defer exporterMu.Unlock()
new := make(exportersMap)
if old, ok := exporters.Load().(exportersMap); ok {
for k, v := range old {
new[k] = v
}
}
new[e] = struct{}{}
exporters.Store(new)
}

// UnregisterExporter removes from the list of Exporters the Exporter that was
// registered with the given name.
// TODO(rghetia) : Remove it.
func UnregisterExporter(e Exporter) {
exporterMu.Lock()
defer exporterMu.Unlock()
new := make(exportersMap)
if old, ok := exporters.Load().(exportersMap); ok {
for k, v := range old {
new[k] = v
}
}
delete(new, e)
exporters.Store(new)
}

// SpanData contains all the information collected by a span.
type SpanData struct {
SpanContext core.SpanContext
Expand Down
41 changes: 22 additions & 19 deletions sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"errors"
"sync"
"time"

"go.opentelemetry.io/sdk/export"
)

const (
Expand Down Expand Up @@ -58,13 +60,13 @@ type BatchSpanProcessorOptions struct {
}

// BatchSpanProcessor implements SpanProcessor interfaces. It is used by
// exporters to receive SpanData asynchronously.
// exporters to receive export.SpanData asynchronously.
// Use BatchSpanProcessorOptions to change the behavior of the processor.
type BatchSpanProcessor struct {
exporter BatchExporter
o BatchSpanProcessorOptions
e export.SpanBatcher
o BatchSpanProcessorOptions

queue chan *SpanData
queue chan *export.SpanData
dropped uint32

stopWait sync.WaitGroup
Expand All @@ -75,11 +77,11 @@ type BatchSpanProcessor struct {
var _ SpanProcessor = (*BatchSpanProcessor)(nil)

// NewBatchSpanProcessor creates a new instance of BatchSpanProcessor
// for a given exporter. It returns an error if exporter is nil.
// for a given export. It returns an error if exporter is nil.
// The newly created BatchSpanProcessor should then be registered with sdk
// using RegisterSpanProcessor.
func NewBatchSpanProcessor(exporter BatchExporter, opts ...BatchSpanProcessorOption) (*BatchSpanProcessor, error) {
if exporter == nil {
func NewBatchSpanProcessor(e export.SpanBatcher, opts ...BatchSpanProcessorOption) (*BatchSpanProcessor, error) {
if e == nil {
return nil, errNilExporter
}

Expand All @@ -92,11 +94,11 @@ func NewBatchSpanProcessor(exporter BatchExporter, opts ...BatchSpanProcessorOpt
opt(&o)
}
bsp := &BatchSpanProcessor{
exporter: exporter,
o: o,
e: e,
o: o,
}

bsp.queue = make(chan *SpanData, bsp.o.MaxQueueSize)
bsp.queue = make(chan *export.SpanData, bsp.o.MaxQueueSize)

bsp.stopCh = make(chan struct{})

Expand All @@ -122,11 +124,11 @@ func NewBatchSpanProcessor(exporter BatchExporter, opts ...BatchSpanProcessorOpt
}

// OnStart method does nothing.
func (bsp *BatchSpanProcessor) OnStart(sd *SpanData) {
func (bsp *BatchSpanProcessor) OnStart(sd *export.SpanData) {
}

// OnEnd method enqueues SpanData for later processing.
func (bsp *BatchSpanProcessor) OnEnd(sd *SpanData) {
// OnEnd method enqueues export.SpanData for later processing.
func (bsp *BatchSpanProcessor) OnEnd(sd *export.SpanData) {
bsp.enqueue(sd)
}

Expand Down Expand Up @@ -164,34 +166,35 @@ func WithBlocking() BatchSpanProcessorOption {
}

func (bsp *BatchSpanProcessor) processQueue() {
batch := make([]*SpanData, 0, bsp.o.MaxExportBatchSize)
batch := make([]*export.SpanData, 0, bsp.o.MaxExportBatchSize)
for {
var sd *SpanData
var sd *export.SpanData
var ok bool
select {
case sd = <-bsp.queue:
if sd != nil {
if sd != nil && sd.SpanContext.IsSampled() {
batch = append(batch, sd)
}
ok = true
default:
ok = false
}

if ok {
if len(batch) >= bsp.o.MaxExportBatchSize {
bsp.exporter.ExportSpans(batch)
bsp.e.ExportSpans(context.Background(), batch)
batch = batch[:0]
}
} else {
if len(batch) > 0 {
bsp.exporter.ExportSpans(batch)
bsp.e.ExportSpans(context.Background(), batch)
}
break
}
}
}

func (bsp *BatchSpanProcessor) enqueue(sd *SpanData) {
func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanData) {
if bsp.o.BlockOnQueueFull {
bsp.queue <- sd
} else {
Expand Down
Loading

0 comments on commit c2d5c66

Please sign in to comment.