-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[HCP Observability] OTELSink (#17159)
* Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Create new OTELExporter which uses the MetricsClient Add transform because the conversion is in an /internal package * Fix lint error * early return when there are no metrics * Add NewOTELExporter() function * Downgrade to metrics SDK version: v1.15.0-rc.1 * Fix imports * fix small nits with comments and url.URL * Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile * Cleanup error handling and clarify empty metrics case * Fix input/expected naming in otel_transform_test.go * add comment for metric tracking * Add a general isEmpty method * Add clear error types * update to latest version 1.15.0 of OTEL * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Initialize OTELSink with sync.Map for all the instrument stores. * Moved PeriodicReader init to NewOtelReader function. This allows us to use a ManualReader for tests. * Switch to mutex instead of sync.Map to avoid type assertion * Add gauge store * Clarify comments * return concrete sink type * Fix lint errors * Move gauge store to be within sink * Use context.TODO,rebase and clenaup opts handling * Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1 * Fix imports * Update to latest stable version by rebasing on cc-4933, fix import, remove mutex init, fix opts error messages and use logger from ctx * Add lots of documentation to the OTELSink * Fix gauge store comment and check ok * Add select and ctx.Done() check to gauge callback * use require.Equal for attributes * Fixed import naming * Remove float64 calls and add a NewGaugeStore method * Change name Store to Set in gaugeStore, add concurrency tests in both OTELSink and gauge store * Generate 100 gauge operations * Seperate the labels into goroutines in sink test * Generate kv store for the test case keys to avoid using uuid * Added a race test with 300 samples for OTELSink * Do not pass in waitgroup and use error channel instead. * Using SHA 7dea2225a218872e86d2f580e82c089b321617b0 to avoid build failures in otel * Fix nits
- Loading branch information
Showing
6 changed files
with
779 additions
and
18 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
package telemetry | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
|
||
"go.opentelemetry.io/otel/attribute" | ||
"go.opentelemetry.io/otel/metric" | ||
) | ||
|
||
// gaugeStore holds last seen Gauge values for a particular metric (<name,last_value>) in the store. | ||
// OTEL does not currently have a synchronous Gauge instrument. Instead, it allows the registration of callbacks. | ||
// The callbacks are called during export, where the Gauge value must be returned. | ||
// This store is a workaround, which holds last seen Gauge values until the callback is called. | ||
type gaugeStore struct { | ||
store map[string]*gaugeValue | ||
mutex sync.Mutex | ||
} | ||
|
||
// gaugeValues are the last seen measurement for a Gauge metric, which contains a float64 value and labels. | ||
type gaugeValue struct { | ||
Value float64 | ||
Attributes []attribute.KeyValue | ||
} | ||
|
||
// NewGaugeStore returns an initialized empty gaugeStore. | ||
func NewGaugeStore() *gaugeStore { | ||
return &gaugeStore{ | ||
store: make(map[string]*gaugeValue, 0), | ||
} | ||
} | ||
|
||
// LoadAndDelete will read a Gauge value and delete it. | ||
// Once registered for a metric name, a Gauge callback will continue to execute every collection cycel. | ||
// We must delete the value once we have read it, to avoid repeat values being sent. | ||
func (g *gaugeStore) LoadAndDelete(key string) (*gaugeValue, bool) { | ||
g.mutex.Lock() | ||
defer g.mutex.Unlock() | ||
|
||
gauge, ok := g.store[key] | ||
if !ok { | ||
return nil, ok | ||
} | ||
|
||
delete(g.store, key) | ||
|
||
return gauge, ok | ||
} | ||
|
||
// Set adds a gaugeValue to the global gauge store. | ||
func (g *gaugeStore) Set(key string, value float64, labels []attribute.KeyValue) { | ||
g.mutex.Lock() | ||
defer g.mutex.Unlock() | ||
|
||
gv := &gaugeValue{ | ||
Value: value, | ||
Attributes: labels, | ||
} | ||
|
||
g.store[key] = gv | ||
} | ||
|
||
// gaugeCallback returns a callback which gets called when metrics are collected for export. | ||
func (g *gaugeStore) gaugeCallback(key string) metric.Float64Callback { | ||
// Closures keep a reference to the key string, that get garbage collected when code completes. | ||
return func(ctx context.Context, obs metric.Float64Observer) error { | ||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
default: | ||
if gauge, ok := g.LoadAndDelete(key); ok { | ||
obs.Observe(gauge.Value, metric.WithAttributes(gauge.Attributes...)) | ||
} | ||
return nil | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
package telemetry | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/otel/attribute" | ||
) | ||
|
||
func TestGaugeStore(t *testing.T) { | ||
t.Parallel() | ||
|
||
gaugeStore := NewGaugeStore() | ||
|
||
attributes := []attribute.KeyValue{ | ||
{ | ||
Key: attribute.Key("test_key"), | ||
Value: attribute.StringValue("test_value"), | ||
}, | ||
} | ||
|
||
gaugeStore.Set("test", 1.23, attributes) | ||
|
||
// Should store a new gauge. | ||
val, ok := gaugeStore.LoadAndDelete("test") | ||
require.True(t, ok) | ||
require.Equal(t, val.Value, 1.23) | ||
require.Equal(t, val.Attributes, attributes) | ||
|
||
// Gauge with key "test" have been deleted. | ||
val, ok = gaugeStore.LoadAndDelete("test") | ||
require.False(t, ok) | ||
require.Nil(t, val) | ||
|
||
gaugeStore.Set("duplicate", 1.5, nil) | ||
gaugeStore.Set("duplicate", 6.7, nil) | ||
|
||
// Gauge with key "duplicate" should hold the latest (last seen) value. | ||
val, ok = gaugeStore.LoadAndDelete("duplicate") | ||
require.True(t, ok) | ||
require.Equal(t, val.Value, 6.7) | ||
} | ||
|
||
func TestGaugeCallback_Failure(t *testing.T) { | ||
t.Parallel() | ||
|
||
k := "consul.raft.apply" | ||
gaugeStore := NewGaugeStore() | ||
gaugeStore.Set(k, 1.23, nil) | ||
|
||
cb := gaugeStore.gaugeCallback(k) | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
|
||
cancel() | ||
err := cb(ctx, nil) | ||
require.ErrorIs(t, err, context.Canceled) | ||
} | ||
|
||
// TestGaugeStore_Race induces a race condition. When run with go test -race, | ||
// this test should pass if implementation is concurrency safe. | ||
func TestGaugeStore_Race(t *testing.T) { | ||
t.Parallel() | ||
|
||
gaugeStore := NewGaugeStore() | ||
|
||
wg := &sync.WaitGroup{} | ||
samples := 100 | ||
errCh := make(chan error, samples) | ||
for i := 0; i < samples; i++ { | ||
wg.Add(1) | ||
key := fmt.Sprintf("consul.test.%d", i) | ||
value := 12.34 | ||
go func() { | ||
defer wg.Done() | ||
gaugeStore.Set(key, value, nil) | ||
gv, _ := gaugeStore.LoadAndDelete(key) | ||
if gv.Value != value { | ||
errCh <- fmt.Errorf("expected value: '%f', but got: '%f' for key: '%s'", value, gv.Value, key) | ||
} | ||
}() | ||
} | ||
|
||
wg.Wait() | ||
|
||
require.Empty(t, errCh) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,209 @@ | ||
package telemetry | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"fmt" | ||
"net/url" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
gometrics "github.com/armon/go-metrics" | ||
"github.com/hashicorp/go-hclog" | ||
"go.opentelemetry.io/otel/attribute" | ||
otelmetric "go.opentelemetry.io/otel/metric" | ||
otelsdk "go.opentelemetry.io/otel/sdk/metric" | ||
"go.opentelemetry.io/otel/sdk/resource" | ||
|
||
"github.com/hashicorp/consul/agent/hcp/client" | ||
) | ||
|
||
type OTELSinkOpts struct { | ||
Reader otelsdk.Reader | ||
Ctx context.Context | ||
} | ||
|
||
// OTELSink captures and aggregates telemetry data as per the OpenTelemetry (OTEL) specification. | ||
// Metric data is exported in OpenTelemetry Protocol (OTLP) wire format. | ||
// This should be used as a Go Metrics backend, as it implements the MetricsSink interface. | ||
type OTELSink struct { | ||
// spaceReplacer cleans the flattened key by removing any spaces. | ||
spaceReplacer *strings.Replacer | ||
logger hclog.Logger | ||
|
||
// meterProvider is an OTEL MeterProvider, the entrypoint to the OTEL Metrics SDK. | ||
// It handles reading/export of aggregated metric data. | ||
// It enables creation and usage of an OTEL Meter. | ||
meterProvider *otelsdk.MeterProvider | ||
|
||
// meter is an OTEL Meter, which enables the creation of OTEL instruments. | ||
meter *otelmetric.Meter | ||
|
||
// Instrument stores contain an OTEL Instrument per metric name (<name, instrument>) | ||
// for each gauge, counter and histogram types. | ||
// An instrument allows us to record a measurement for a particular metric, and continuously aggregates metrics. | ||
// We lazy load the creation of these intruments until a metric is seen, and use them repeatedly to record measurements. | ||
gaugeInstruments map[string]otelmetric.Float64ObservableGauge | ||
counterInstruments map[string]otelmetric.Float64Counter | ||
histogramInstruments map[string]otelmetric.Float64Histogram | ||
|
||
// gaugeStore is required to hold last-seen values of gauges | ||
// This is a workaround, as OTEL currently does not have synchronous gauge instruments. | ||
// It only allows the registration of "callbacks", which obtain values when the callback is called. | ||
// We must hold gauge values until the callback is called, when the measurement is exported, and can be removed. | ||
gaugeStore *gaugeStore | ||
|
||
mutex sync.Mutex | ||
} | ||
|
||
// NewOTELReader returns a configured OTEL PeriodicReader to export metrics every X seconds. | ||
// It configures the reader with a custom OTELExporter with a MetricsClient to transform and export | ||
// metrics in OTLP format to an external url. | ||
func NewOTELReader(client client.MetricsClient, url url.URL, exportInterval time.Duration) otelsdk.Reader { | ||
exporter := NewOTELExporter(client, url) | ||
return otelsdk.NewPeriodicReader(exporter, otelsdk.WithInterval(exportInterval)) | ||
} | ||
|
||
// NewOTELSink returns a sink which fits the Go Metrics MetricsSink interface. | ||
// It sets up a MeterProvider and Meter, key pieces of the OTEL Metrics SDK which | ||
// enable us to create OTEL Instruments to record measurements. | ||
func NewOTELSink(opts *OTELSinkOpts) (*OTELSink, error) { | ||
if opts.Reader == nil { | ||
return nil, fmt.Errorf("ferror: provide valid reader") | ||
} | ||
|
||
if opts.Ctx == nil { | ||
return nil, fmt.Errorf("ferror: provide valid context") | ||
} | ||
|
||
// Setup OTEL Metrics SDK to aggregate, convert and export metrics. | ||
res := resource.NewSchemaless() | ||
meterProvider := otelsdk.NewMeterProvider(otelsdk.WithResource(res), otelsdk.WithReader(opts.Reader)) | ||
meter := meterProvider.Meter("github.com/hashicorp/consul/agent/hcp/telemetry") | ||
|
||
return &OTELSink{ | ||
spaceReplacer: strings.NewReplacer(" ", "_"), | ||
logger: hclog.FromContext(opts.Ctx).Named("otel_sink"), | ||
meterProvider: meterProvider, | ||
meter: &meter, | ||
gaugeStore: NewGaugeStore(), | ||
gaugeInstruments: make(map[string]otelmetric.Float64ObservableGauge, 0), | ||
counterInstruments: make(map[string]otelmetric.Float64Counter, 0), | ||
histogramInstruments: make(map[string]otelmetric.Float64Histogram, 0), | ||
}, nil | ||
} | ||
|
||
// SetGauge emits a Consul gauge metric. | ||
func (o *OTELSink) SetGauge(key []string, val float32) { | ||
o.SetGaugeWithLabels(key, val, nil) | ||
} | ||
|
||
// AddSample emits a Consul histogram metric. | ||
func (o *OTELSink) AddSample(key []string, val float32) { | ||
o.AddSampleWithLabels(key, val, nil) | ||
} | ||
|
||
// IncrCounter emits a Consul counter metric. | ||
func (o *OTELSink) IncrCounter(key []string, val float32) { | ||
o.IncrCounterWithLabels(key, val, nil) | ||
} | ||
|
||
// AddSampleWithLabels emits a Consul gauge metric that gets | ||
// registed by an OpenTelemetry Histogram instrument. | ||
func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometrics.Label) { | ||
k := o.flattenKey(key) | ||
|
||
// Set value in global Gauge store. | ||
o.gaugeStore.Set(k, float64(val), toAttributes(labels)) | ||
|
||
o.mutex.Lock() | ||
defer o.mutex.Unlock() | ||
|
||
// If instrument does not exist, create it and register callback to emit last value in global Gauge store. | ||
if _, ok := o.gaugeInstruments[k]; !ok { | ||
// The registration of a callback only needs to happen once, when the instrument is created. | ||
// The callback will be triggered every export cycle for that metric. | ||
// It must be explicitly de-registered to be removed (which we do not do), to ensure new gauge values are exported every cycle. | ||
inst, err := (*o.meter).Float64ObservableGauge(k, otelmetric.WithFloat64Callback(o.gaugeStore.gaugeCallback(k))) | ||
if err != nil { | ||
o.logger.Error("Failed to emit gauge: %w", err) | ||
return | ||
} | ||
o.gaugeInstruments[k] = inst | ||
} | ||
} | ||
|
||
// AddSampleWithLabels emits a Consul sample metric that gets registed by an OpenTelemetry Histogram instrument. | ||
func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gometrics.Label) { | ||
k := o.flattenKey(key) | ||
|
||
o.mutex.Lock() | ||
defer o.mutex.Unlock() | ||
|
||
inst, ok := o.histogramInstruments[k] | ||
if !ok { | ||
histogram, err := (*o.meter).Float64Histogram(k) | ||
if err != nil { | ||
o.logger.Error("Failed to emit gauge: %w", err) | ||
return | ||
} | ||
inst = histogram | ||
o.histogramInstruments[k] = inst | ||
} | ||
|
||
attrs := toAttributes(labels) | ||
inst.Record(context.TODO(), float64(val), otelmetric.WithAttributes(attrs...)) | ||
} | ||
|
||
// IncrCounterWithLabels emits a Consul counter metric that gets registed by an OpenTelemetry Histogram instrument. | ||
func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gometrics.Label) { | ||
k := o.flattenKey(key) | ||
|
||
o.mutex.Lock() | ||
defer o.mutex.Unlock() | ||
|
||
inst, ok := o.counterInstruments[k] | ||
if !ok { | ||
counter, err := (*o.meter).Float64Counter(k) | ||
if err != nil { | ||
o.logger.Error("Failed to emit gauge: %w", err) | ||
return | ||
} | ||
|
||
inst = counter | ||
o.counterInstruments[k] = inst | ||
} | ||
|
||
attrs := toAttributes(labels) | ||
inst.Add(context.TODO(), float64(val), otelmetric.WithAttributes(attrs...)) | ||
} | ||
|
||
// EmitKey unsupported. | ||
func (o *OTELSink) EmitKey(key []string, val float32) {} | ||
|
||
// flattenKey key along with its labels. | ||
func (o *OTELSink) flattenKey(parts []string) string { | ||
buf := &bytes.Buffer{} | ||
joined := strings.Join(parts, ".") | ||
|
||
o.spaceReplacer.WriteString(buf, joined) | ||
|
||
return buf.String() | ||
} | ||
|
||
// toAttributes converts go metrics Labels into OTEL format []attributes.KeyValue | ||
func toAttributes(labels []gometrics.Label) []attribute.KeyValue { | ||
if len(labels) == 0 { | ||
return nil | ||
} | ||
attrs := make([]attribute.KeyValue, len(labels)) | ||
for i, label := range labels { | ||
attrs[i] = attribute.KeyValue{ | ||
Key: attribute.Key(label.Name), | ||
Value: attribute.StringValue(label.Value), | ||
} | ||
} | ||
|
||
return attrs | ||
} |
Oops, something went wrong.