Skip to content

Commit

Permalink
[metrics] standardize/simplify export pipeline setup (#395)
Browse files Browse the repository at this point in the history
* Introduce simplified export pipeline setup for stdout

* Standardize dogstatsd,stdout,prometheus calling.

* Creates NewRawExporter, NewExportPipeline, InstallNewPipeline methods.
* Uses Options rather than Config throughout for options.

* fix merge conflicts.

Co-authored-by: Liz Fong-Jones <elizabeth@ctyalcove.org>
  • Loading branch information
2 people authored and jmacd committed Jan 2, 2020
1 parent 5eb457a commit 067aa9e
Show file tree
Hide file tree
Showing 14 changed files with 220 additions and 111 deletions.
29 changes: 6 additions & 23 deletions api/global/internal/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"io"
"io/ioutil"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand All @@ -15,9 +14,6 @@ import (
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/exporter/metric/stdout"
metrictest "go.opentelemetry.io/otel/internal/metric"
"go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)

func TestDirect(t *testing.T) {
Expand Down Expand Up @@ -207,26 +203,13 @@ func TestDefaultSDK(t *testing.T) {
counter.Add(ctx, 1, labels1)

in, out := io.Pipe()
// TODO this should equal a stdout.NewPipeline(), use it.
// Consider also moving the io.Pipe() and go func() call
// below into a test helper somewhere.
sdk := func(options stdout.Options) *push.Controller {
selector := simple.NewWithInexpensiveMeasure()
exporter, err := stdout.New(options)
if err != nil {
panic(err)
}
batcher := ungrouped.New(selector, true)
pusher := push.New(batcher, exporter, time.Second)
pusher.Start()

return pusher
}(stdout.Options{
pusher, err := stdout.InstallNewPipeline(stdout.Options{
Writer: out,
DoNotPrintTime: true,
})

global.SetMeterProvider(sdk)
if err != nil {
panic(err)
}

counter.Add(ctx, 1, labels1)

Expand All @@ -236,9 +219,9 @@ func TestDefaultSDK(t *testing.T) {
ch <- string(data)
}()

sdk.Stop()
pusher.Stop()
out.Close()

require.Equal(t, `{"updates":[{"name":"test.builtin{A=B}","sum":1}]}
require.Equal(t, `{"updates":[{"name":"test.builtin","sum":1}]}
`, <-ch)
}
12 changes: 1 addition & 11 deletions example/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package main
import (
"context"
"log"
"time"

"go.opentelemetry.io/otel/api/distributedcontext"
"go.opentelemetry.io/otel/api/global"
Expand All @@ -26,10 +25,7 @@ import (
"go.opentelemetry.io/otel/api/trace"
metricstdout "go.opentelemetry.io/otel/exporter/metric/stdout"
tracestdout "go.opentelemetry.io/otel/exporter/trace/stdout"
metricsdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

Expand Down Expand Up @@ -57,19 +53,13 @@ func initTracer() {
}

func initMeter() *push.Controller {
selector := simple.NewWithExactMeasure()
exporter, err := metricstdout.New(metricstdout.Options{
pusher, err := metricstdout.InstallNewPipeline(metricstdout.Options{
Quantiles: []float64{0.5, 0.9, 0.99},
PrettyPrint: false,
})
if err != nil {
log.Panicf("failed to initialize metric stdout exporter %v", err)
}
batcher := defaultkeys.New(selector, metricsdk.NewDefaultLabelEncoder(), true)
pusher := push.New(batcher, exporter, time.Second)
pusher.Start()

global.SetMeterProvider(pusher)
return pusher
}

Expand Down
25 changes: 4 additions & 21 deletions example/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ import (
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporter/metric/prometheus"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)

var (
Expand All @@ -37,29 +34,15 @@ var (
)

func initMeter() *push.Controller {
selector := simple.NewWithExactMeasure()
exporter, err := prometheus.NewExporter(prometheus.Options{})

pusher, hf, err := prometheus.InstallNewPipeline(prometheus.Options{})
if err != nil {
log.Panicf("failed to initialize metric stdout exporter %v", err)
log.Panicf("failed to initialize prometheus exporter %v", err)
}
// Prometheus needs to use a stateful batcher since counters (and histogram since they are a collection of Counters)
// are cumulative (i.e., monotonically increasing values) and should not be resetted after each export.
//
// Prometheus uses this approach to be resilient to scrape failures.
// If a Prometheus server tries to scrape metrics from a host and fails for some reason,
// it could try again on the next scrape and no data would be lost, only resolution.
//
// Gauges (or LastValues) and Summaries are an exception to this and have different behaviors.
batcher := defaultkeys.New(selector, sdkmetric.NewDefaultLabelEncoder(), true)
pusher := push.New(batcher, exporter, time.Second)
pusher.Start()

http.HandleFunc("/", hf)
go func() {
_ = http.ListenAndServe(":2222", exporter)
_ = http.ListenAndServe(":2222", nil)
}()

global.SetMeterProvider(pusher)
return pusher
}

Expand Down
59 changes: 52 additions & 7 deletions exporter/metric/dogstatsd/dogstatsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@ package dogstatsd // import "go.opentelemetry.io/otel/exporter/metric/dogstatsd"

import (
"bytes"
"time"

"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/exporter/metric/internal/statsd"

export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)

type (
Config = statsd.Config
Options = statsd.Options

// Exporter implements a dogstatsd-format statsd exporter,
// which encodes label sets as independent fields in the
Expand All @@ -45,20 +51,59 @@ var (
_ export.LabelEncoder = &Exporter{}
)

// New returns a new Dogstatsd-syntax exporter. This type implements
// the metric.LabelEncoder interface, allowing the SDK's unique label
// encoding to be pre-computed for the exporter and stored in the
// LabelSet.
func New(config Config) (*Exporter, error) {
// NewRawExporter returns a new Dogstatsd-syntax exporter for use in a pipeline.
// This type implements the metric.LabelEncoder interface,
// allowing the SDK's unique label encoding to be pre-computed
// for the exporter and stored in the LabelSet.
func NewRawExporter(options Options) (*Exporter, error) {
exp := &Exporter{
LabelEncoder: statsd.NewLabelEncoder(),
}

var err error
exp.Exporter, err = statsd.NewExporter(config, exp)
exp.Exporter, err = statsd.NewExporter(options, exp)
return exp, err
}

// InstallNewPipeline instantiates a NewExportPipeline and registers it globally.
// Typically called as:
// pipeline, err := dogstatsd.InstallNewPipeline(dogstatsd.Options{...})
// if err != nil {
// ...
// }
// defer pipeline.Stop()
// ... Done
func InstallNewPipeline(options Options) (*push.Controller, error) {
controller, err := NewExportPipeline(options)
if err != nil {
return controller, err
}
global.SetMeterProvider(controller)
return controller, err
}

// NewExportPipeline sets up a complete export pipeline with the recommended setup,
// chaining a NewRawExporter into the recommended selectors and batchers.
func NewExportPipeline(options Options) (*push.Controller, error) {
selector := simple.NewWithExactMeasure()
exporter, err := NewRawExporter(options)
if err != nil {
return nil, err
}

// The ungrouped batcher ensures that the export sees the full
// set of labels as dogstatsd tags.
batcher := ungrouped.New(selector, false)

// The pusher automatically recognizes that the exporter
// implements the LabelEncoder interface, which ensures the
// export encoding for labels is encoded in the LabelSet.
pusher := push.New(batcher, exporter, time.Hour)
pusher.Start()

return pusher, nil
}

// AppendName is part of the stats-internal adapter interface.
func (*Exporter) AppendName(rec export.Record, buf *bytes.Buffer) {
_, _ = buf.WriteString(rec.Descriptor().Name())
Expand Down
2 changes: 1 addition & 1 deletion exporter/metric/dogstatsd/dogstatsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestDogstatsLabels(t *testing.T) {
checkpointSet.Add(desc, cagg, key.New("A").String("B"))

var buf bytes.Buffer
exp, err := dogstatsd.New(dogstatsd.Config{
exp, err := dogstatsd.NewRawExporter(dogstatsd.Options{
Writer: &buf,
})
require.Nil(t, err)
Expand Down
16 changes: 1 addition & 15 deletions exporter/metric/dogstatsd/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@ import (
"io"
"log"
"sync"
"time"

"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporter/metric/dogstatsd"
"go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)

func ExampleNew() {
Expand Down Expand Up @@ -42,8 +38,7 @@ func ExampleNew() {
}()

// Create a meter
selector := simple.NewWithExactMeasure()
exporter, err := dogstatsd.New(dogstatsd.Config{
pusher, err := dogstatsd.NewExportPipeline(dogstatsd.Options{
// The Writer field provides test support.
Writer: writer,

Expand All @@ -54,15 +49,6 @@ func ExampleNew() {
if err != nil {
log.Fatal("Could not initialize dogstatsd exporter:", err)
}
// The ungrouped batcher ensures that the export sees the full
// set of labels as dogstatsd tags.
batcher := ungrouped.New(selector, false)

// The pusher automatically recognizes that the exporter
// implements the LabelEncoder interface, which ensures the
// export encoding for labels is encoded in the LabelSet.
pusher := push.New(batcher, exporter, time.Hour)
pusher.Start()

ctx := context.Background()

Expand Down
22 changes: 11 additions & 11 deletions exporter/metric/internal/statsd/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
)

type (
// Config supports common options that apply to statsd exporters.
Config struct {
// Options supports common options that apply to statsd exporters.
Options struct {
// URL describes the destination for exporting statsd data.
// e.g., udp://host:port
// tcp://host:port
Expand All @@ -57,7 +57,7 @@ type (
// exporters.
Exporter struct {
adapter Adapter
config Config
options Options
conn net.Conn
writer io.Writer
buffer bytes.Buffer
Expand Down Expand Up @@ -88,17 +88,17 @@ var (

// NewExport returns a common implementation for exporters that Export
// statsd syntax.
func NewExporter(config Config, adapter Adapter) (*Exporter, error) {
if config.MaxPacketSize <= 0 {
config.MaxPacketSize = MaxPacketSize
func NewExporter(options Options, adapter Adapter) (*Exporter, error) {
if options.MaxPacketSize <= 0 {
options.MaxPacketSize = MaxPacketSize
}
var writer io.Writer
var conn net.Conn
var err error
if config.Writer != nil {
writer = config.Writer
if options.Writer != nil {
writer = options.Writer
} else {
conn, err = dial(config.URL)
conn, err = dial(options.URL)
if conn != nil {
writer = conn
}
Expand All @@ -108,7 +108,7 @@ func NewExporter(config Config, adapter Adapter) (*Exporter, error) {
// Start() and Stop() API.
return &Exporter{
adapter: adapter,
config: config,
options: options,
conn: conn,
writer: writer,
}, err
Expand Down Expand Up @@ -171,7 +171,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
return
}

if buf.Len() < e.config.MaxPacketSize {
if buf.Len() < e.options.MaxPacketSize {
return
}
if before == 0 {
Expand Down
8 changes: 4 additions & 4 deletions exporter/metric/internal/statsd/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ timer.B.D:%s|ms
t.Run(nkind.String(), func(t *testing.T) {
ctx := context.Background()
writer := &testWriter{}
config := statsd.Config{
options := statsd.Options{
Writer: writer,
MaxPacketSize: 1024,
}
exp, err := statsd.NewExporter(config, adapter)
exp, err := statsd.NewExporter(options, adapter)
if err != nil {
t.Fatal("New error: ", err)
}
Expand Down Expand Up @@ -274,12 +274,12 @@ func TestPacketSplit(t *testing.T) {
t.Run(tcase.name, func(t *testing.T) {
ctx := context.Background()
writer := &testWriter{}
config := statsd.Config{
options := statsd.Options{
Writer: writer,
MaxPacketSize: 1024,
}
adapter := newWithTagsAdapter()
exp, err := statsd.NewExporter(config, adapter)
exp, err := statsd.NewExporter(options, adapter)
if err != nil {
t.Fatal("New error: ", err)
}
Expand Down
Loading

0 comments on commit 067aa9e

Please sign in to comment.