Skip to content

Commit

Permalink
[HCP Observability] OTELExporter (#17128)
Browse files Browse the repository at this point in the history
* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Create new OTELExporter which uses the MetricsClient
Add transform because the conversion is in an /internal package

* Fix lint error

* early return when there are no metrics

* Add NewOTELExporter() function

* Downgrade to metrics SDK version: v1.15.0-rc.1

* Fix imports

* fix small nits with comments and url.URL

* Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile

* Cleanup error handling and clarify empty metrics case

* Fix input/expected naming in otel_transform_test.go

* add comment for metric tracking

* Add a general isEmpty method

* Add clear error types

* update to latest version 1.15.0 of OTEL
  • Loading branch information
Achooo committed May 18, 2023
1 parent 21642f6 commit dafc826
Show file tree
Hide file tree
Showing 7 changed files with 705 additions and 13 deletions.
12 changes: 12 additions & 0 deletions agent/hcp/telemetry/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Package telemetry implements functionality to collect, aggregate, convert and export
// telemetry data in OpenTelemetry Protocol (OTLP) format.
//
// The entrypoint is the OpenTelemetry (OTEL) go-metrics sink which:
// - Receives metric data.
// - Aggregates metric data using the OTEL Go Metrics SDK.
// - Exports metric data using a configurable OTEL exporter.
//
// The package also provides an OTEL exporter implementation to be used within the sink, which:
// - Transforms metric data from the Metrics SDK OTEL representation to OTLP format.
// - Exports OTLP metric data to an external endpoint using a configurable client.
package telemetry
72 changes: 72 additions & 0 deletions agent/hcp/telemetry/otel_exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package telemetry

import (
"context"
"net/url"

"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"

hcpclient "github.com/hashicorp/consul/agent/hcp/client"
)

// OTELExporter is a custom implementation of a OTEL Metrics SDK metrics.Exporter.
// The exporter is used by a OTEL Metrics SDK PeriodicReader to export aggregated metrics.
// This allows us to use a custom client - HCP authenticated MetricsClient.
type OTELExporter struct {
client hcpclient.MetricsClient
url url.URL
}

// NewOTELExporter returns a configured OTELExporter
func NewOTELExporter(client hcpclient.MetricsClient, url url.URL) *OTELExporter {
return &OTELExporter{
client: client,
url: url,
}
}

// Temporality returns the Cumulative temporality for metrics aggregation.
// Telemetry Gateway stores metrics in Prometheus format, so use Cummulative aggregation as default.
func (e *OTELExporter) Temporality(_ metric.InstrumentKind) metricdata.Temporality {
return metricdata.CumulativeTemporality
}

// Aggregation returns the Aggregation to use for an instrument kind.
// The default implementation provided by the OTEL Metrics SDK library DefaultAggregationSelector panics.
// This custom version replicates that logic, but removes the panic.
func (e *OTELExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggregation {
switch kind {
case metric.InstrumentKindObservableGauge:
return aggregation.LastValue{}
case metric.InstrumentKindHistogram:
return aggregation.ExplicitBucketHistogram{
Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
NoMinMax: false,
}
}
// for metric.InstrumentKindCounter and others, default to sum.
return aggregation.Sum{}
}

// Export serializes and transmits metric data to a receiver.
func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceMetrics) error {
otlpMetrics := transformOTLP(metrics)
if isEmpty(otlpMetrics) {
return nil
}
return e.client.ExportMetrics(ctx, otlpMetrics, e.url.String())
}

// ForceFlush is a no-op, as the MetricsClient client holds no state.
func (e *OTELExporter) ForceFlush(ctx context.Context) error {
// TODO: Emit metric when this operation occurs.
return ctx.Err()
}

// Shutdown is a no-op, as the MetricsClient is a HTTP client that requires no graceful shutdown.
func (e *OTELExporter) Shutdown(ctx context.Context) error {
// TODO: Emit metric when this operation occurs.
return ctx.Err()
}
140 changes: 140 additions & 0 deletions agent/hcp/telemetry/otel_exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package telemetry

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"

"github.com/hashicorp/consul/agent/hcp/client"
)

func TestTemporality(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
require.Equal(t, metricdata.CumulativeTemporality, exp.Temporality(metric.InstrumentKindCounter))
}

func TestAggregation(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
kind metric.InstrumentKind
expAgg aggregation.Aggregation
}{
"gauge": {
kind: metric.InstrumentKindObservableGauge,
expAgg: aggregation.LastValue{},
},
"counter": {
kind: metric.InstrumentKindCounter,
expAgg: aggregation.Sum{},
},
"histogram": {
kind: metric.InstrumentKindHistogram,
expAgg: aggregation.ExplicitBucketHistogram{Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, NoMinMax: false},
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
require.Equal(t, test.expAgg, exp.Aggregation(test.kind))
})
}
}

type mockMetricsClient struct {
exportErr error
}

func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
return m.exportErr
}

func TestExport(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
wantErr string
metrics *metricdata.ResourceMetrics
client client.MetricsClient
}{
"earlyReturnWithoutScopeMetrics": {
client: &mockMetricsClient{},
metrics: mutateMetrics(nil),
},
"earlyReturnWithoutMetrics": {
client: &mockMetricsClient{},
metrics: mutateMetrics([]metricdata.ScopeMetrics{
{Metrics: []metricdata.Metrics{}},
},
),
},
"errorWithExportFailure": {
client: &mockMetricsClient{
exportErr: fmt.Errorf("failed to export metrics."),
},
metrics: mutateMetrics([]metricdata.ScopeMetrics{
{
Metrics: []metricdata.Metrics{
{
Name: "consul.raft.commitTime",
Data: metricdata.Gauge[float64]{},
},
},
},
},
),
wantErr: "failed to export metrics",
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
exp := &OTELExporter{
client: test.client,
}

err := exp.Export(context.Background(), test.metrics)
if test.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), test.wantErr)
return
}

require.NoError(t, err)
})
}
}

func TestForceFlush(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
ctx, cancel := context.WithCancel(context.Background())
cancel()

err := exp.ForceFlush(ctx)
require.ErrorIs(t, err, context.Canceled)
}

func TestShutdown(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
ctx, cancel := context.WithCancel(context.Background())
cancel()

err := exp.Shutdown(ctx)
require.ErrorIs(t, err, context.Canceled)
}

func mutateMetrics(m []metricdata.ScopeMetrics) *metricdata.ResourceMetrics {
return &metricdata.ResourceMetrics{
Resource: resource.Empty(),
ScopeMetrics: m,
}
}
Loading

0 comments on commit dafc826

Please sign in to comment.