Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HCP Observability] Add custom metrics for OTEL sink, improve logging, upgrade modules and cleanup metrics client #17455

Merged
merged 11 commits into from
May 26, 2023
25 changes: 10 additions & 15 deletions agent/hcp/client/metrics_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
"golang.org/x/oauth2"
"google.golang.org/protobuf/proto"

"github.com/hashicorp/consul/version"
)

const (
Expand Down Expand Up @@ -72,8 +74,9 @@ func NewMetricsClient(cfg CloudConfig, ctx context.Context) (MetricsClient, erro
}

header := make(http.Header)
header.Set("Content-Type", "application/x-protobuf")
header.Set("content-type", "application/x-protobuf")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lowercased to be consistent with other ones.

header.Set("x-hcp-resource-id", r.String())
header.Set("x-channel", fmt.Sprintf("consul/%s", version.GetHumanVersion()))

return &otlpClient{
client: c,
Expand Down Expand Up @@ -124,36 +127,28 @@ func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.R

body, err := proto.Marshal(pbRequest)
if err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
return fmt.Errorf("failed to marshal the request: %w", err)
}

req, err := retryablehttp.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
return fmt.Errorf("failed to create request: %w", err)
}
req.Header = *o.header

resp, err := o.client.Do(req.WithContext(ctx))
if err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
return fmt.Errorf("failed to post metrics: %w", err)
}
defer resp.Body.Close()

var respData bytes.Buffer
if _, err := io.Copy(&respData, resp.Body); err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
return fmt.Errorf("failed to read body: %w", err)
}

if respData.Len() != 0 {
Copy link
Contributor Author

@Achooo Achooo May 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FIX: Check status code and remove useless logic in MetricsClient

When testing the export metrics, I hit a case where I would get a 404: not found because the endpoint wasn't hooked up. This made the code continue and try to deserialize an object that wasn't there.

So I add check to see if the status isn't 200: OK.

The colmetricpb.ExportMetricsServiceResponse returned is an empty struct on the Telemetry Gateway side, so don't bother handling it - delete all that code.

var respProto colmetricpb.ExportMetricsServiceResponse
if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
}

if respProto.PartialSuccess != nil {
msg := respProto.PartialSuccess.GetErrorMessage()
return fmt.Errorf("failed to export metrics: partial success: %s", msg)
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to export metrics: code %d: %s", resp.StatusCode, string(body))
}

return nil
Expand Down
13 changes: 5 additions & 8 deletions agent/hcp/client/metrics_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
colpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
"google.golang.org/protobuf/proto"

"github.com/hashicorp/consul/version"
)

func TestNewMetricsClient(t *testing.T) {
Expand Down Expand Up @@ -72,22 +74,17 @@ func TestExportMetrics(t *testing.T) {
},
"failsWithNonRetryableError": {
status: http.StatusBadRequest,
wantErr: "failed to export metrics",
wantErr: "failed to export metrics: code 400",
},
} {
t.Run(name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.Header.Get("Content-Type"), "application/x-protobuf")
require.Equal(t, r.Header.Get("content-type"), "application/x-protobuf")
require.Equal(t, r.Header.Get("x-hcp-resource-id"), testResourceID)
require.Equal(t, r.Header.Get("x-channel"), fmt.Sprintf("consul/%s", version.GetHumanVersion()))
require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token")

body := colpb.ExportMetricsServiceResponse{}

if test.wantErr != "" {
body.PartialSuccess = &colpb.ExportMetricsPartialSuccess{
ErrorMessage: "partial failure",
}
}
bytes, err := proto.Marshal(&body)

require.NoError(t, err)
Expand Down
21 changes: 14 additions & 7 deletions agent/hcp/deps.go
Copy link
Contributor Author

@Achooo Achooo May 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMPROVEMENT: Cleanup

  • Removed named params for code readability
  • Add a debug "Initialized HCP Metrics Sink" log to confirm the Sink has been initialized successfully
  • Fix the naming of loggers (to make logs nicer). I added a "hcp" prefix in setup.go, and these here have the related operations as names.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package hcp

import (
"context"
"fmt"
"net/url"
"time"

Expand All @@ -24,20 +25,24 @@ type Deps struct {
Sink metrics.MetricSink
}

func NewDeps(cfg config.CloudConfig, logger hclog.Logger, nodeID types.NodeID) (d Deps, err error) {
d.Client, err = hcpclient.NewClient(cfg)
func NewDeps(cfg config.CloudConfig, logger hclog.Logger, nodeID types.NodeID) (Deps, error) {
client, err := hcpclient.NewClient(cfg)
if err != nil {
return
return Deps{}, fmt.Errorf("failed to init client: %w:", err)
Achooo marked this conversation as resolved.
Show resolved Hide resolved
}

d.Provider, err = scada.New(cfg, logger.Named("hcp.scada"))
provider, err := scada.New(cfg, logger.Named("scada"))
if err != nil {
return
return Deps{}, fmt.Errorf("failed to init scada: %w", err)
}

d.Sink = sink(d.Client, &cfg, logger, nodeID)
sink := sink(client, &cfg, logger.Named("sink"), nodeID)

return
return Deps{
Client: client,
Provider: provider,
Sink: sink,
}, nil
}

// sink provides initializes an OTELSink which forwards Consul metrics to HCP.
Expand Down Expand Up @@ -86,5 +91,7 @@ func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Lo
return nil
}

logger.Debug("initialized HCP metrics sink")

return sink
}
14 changes: 14 additions & 0 deletions agent/hcp/telemetry/custom_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package telemetry
Copy link
Contributor Author

@Achooo Achooo May 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keys for custom metrics for the operations of the sink. These metrics do not need to be registered in consul as a whole (e.g. for prometheus) as they will only exist when the HCP sink is created, so only the OTELSink / Inmemsink will be receiving these metrics when the OTELSink is running.

These metrics will be used to monitor the sink, and potentially also in E2E tests to ensure metrics are exported correctly.

Is a separate file overkill?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate file is ok. I would change the naming slightly to be more specific. I would also prefix with a generic prefix (something like IntetnalMetric) and drop the type:

var (
	internalMetricTransformFailure = []string{"hcp", "otel", "transform", "failure"}

	internalMetricExportSuccess = []string{"hcp", "otel", "exporter", "export", "sucess"}
	internalMetricExportFailure = []string{"hcp", "otel", "exporter", "export", "failure"}

	internalMetricExporterShutdown = []string{"hcp", "otel", "exporter", "shutdown"}
	internalMetricExporterForceFlush = []string{"hcp", "otel", "exporter", "force_flush"}
)


// Keys for custom Go Metrics metrics emitted only for the OTEL
// export (exporter.go) and transform (transform.go) failures and successes.
// These enable us to monitor OTEL operations.
var (
transformFailureMetric []string = []string{"hcp", "otel", "transform", "failure"}

exportSuccessMetric []string = []string{"hcp", "otel", "exporter", "export", "sucess"}
exportFailureMetric []string = []string{"hcp", "otel", "exporter", "export", "failure"}

exporterShutdownMetric []string = []string{"hcp", "otel", "exporter", "shutdown"}
exporterForceFlushMetric []string = []string{"hcp", "otel", "exporter", "force_flush"}
)
15 changes: 12 additions & 3 deletions agent/hcp/telemetry/otel_exporter.go
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMPROVEMENT: Address TODO's to instrument Exporter operations in this file

Send a "hcp.otel.exporter.export.failure" for export failure Send a "hcp.otel.exporter.export.success" for export success
Send a "hcp.otel.exporter.export.force_flush" for force flush Send a "hcp.otel.exporter.export.shutdown" for shutdown

Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package telemetry

import (
"context"
"fmt"
"net/url"

goMetrics "github.com/armon/go-metrics"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
Expand Down Expand Up @@ -56,17 +58,24 @@ func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceM
if isEmpty(otlpMetrics) {
return nil
}
return e.client.ExportMetrics(ctx, otlpMetrics, e.endpoint.String())
err := e.client.ExportMetrics(ctx, otlpMetrics, e.endpoint.String())
if err != nil {
goMetrics.IncrCounter(exportFailureMetric, 1)
return fmt.Errorf("failed to export metrics: %w", err)
}

goMetrics.IncrCounter(exportSuccessMetric, 1)
return nil
}

// ForceFlush is a no-op, as the MetricsClient client holds no state.
func (e *OTELExporter) ForceFlush(ctx context.Context) error {
// TODO: Emit metric when this operation occurs.
goMetrics.IncrCounter(exporterForceFlushMetric, 1)
return ctx.Err()
}

// Shutdown is a no-op, as the MetricsClient is a HTTP client that requires no graceful shutdown.
func (e *OTELExporter) Shutdown(ctx context.Context) error {
// TODO: Emit metric when this operation occurs.
goMetrics.IncrCounter(exporterShutdownMetric, 1)
return ctx.Err()
}
85 changes: 77 additions & 8 deletions agent/hcp/telemetry/otel_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"context"
"fmt"
"net/url"
"strings"
"testing"
"time"

"github.com/armon/go-metrics"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
Expand All @@ -16,6 +19,14 @@ import (
"github.com/hashicorp/consul/agent/hcp/client"
)

type mockMetricsClient struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just moved this to the top... no changes.

exportErr error
}

func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
return m.exportErr
}

func TestTemporality(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
Expand Down Expand Up @@ -50,14 +61,6 @@ func TestAggregation(t *testing.T) {
}
}

type mockMetricsClient struct {
exportErr error
}

func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
return m.exportErr
}

func TestExport(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
Expand Down Expand Up @@ -111,6 +114,72 @@ func TestExport(t *testing.T) {
}
}

// TestExport_CustomMetrics tests that a custom metric (hcp.otel.exporter.*) is emitted
// for exporter operations. This test cannot be run in parallel as the metrics.NewGlobal()
// sets a shared global sink.
func TestExport_CustomMetrics(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test for the new exporter metrics metric, which makes a call to export/shutdown/force_flush and checks that the metric is in the inmemsink.

for name, tc := range map[string]struct {
client client.MetricsClient
metricKey []string
operation string
}{
"exportSuccessEmitsCustomMetric": {
client: &mockMetricsClient{},
metricKey: exportSuccessMetric,
operation: "export",
},
"exportFailureEmitsCustomMetric": {
client: &mockMetricsClient{
exportErr: fmt.Errorf("client err"),
},
metricKey: exportFailureMetric,
operation: "export",
},
"shutdownEmitsCustomMetric": {
metricKey: exporterShutdownMetric,
operation: "shutdown",
},
"forceFlushEmitsCustomMetric": {
metricKey: exporterForceFlushMetric,
operation: "flush",
},
} {
t.Run(name, func(t *testing.T) {
// Init global sink.
serviceName := "test.transform"
cfg := metrics.DefaultConfig(serviceName)
cfg.EnableHostname = false

sink := metrics.NewInmemSink(10*time.Second, 10*time.Second)
metrics.NewGlobal(cfg, sink)

// Perform operation that emits metric.
exp := NewOTELExporter(tc.client, &url.URL{})

ctx := context.Background()
switch tc.operation {
case "flush":
exp.ForceFlush(ctx)
case "shutdown":
exp.Shutdown(ctx)
default:
exp.Export(ctx, inputResourceMetrics)
}

// Collect sink metrics.
intervals := sink.Data()
require.Len(t, intervals, 1)
key := serviceName + "." + strings.Join(tc.metricKey, ".")
sv := intervals[0].Counters[key]

// Verify count for transform failure metric.
require.NotNil(t, sv)
require.NotNil(t, sv.AggregateSample)
require.Equal(t, 1, sv.AggregateSample.Count)
})
}
}

func TestForceFlush(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
Expand Down
3 changes: 2 additions & 1 deletion agent/hcp/telemetry/otlp_transform.go
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMPROVEMENT: Address TODO to instrument transform operations in this file

Send a `"hcp.otel.transform.failure" for transform failure

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"

goMetrics "github.com/armon/go-metrics"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
cpb "go.opentelemetry.io/proto/otlp/common/v1"
Expand Down Expand Up @@ -70,7 +71,7 @@ func metricsToPB(metrics []metricdata.Metrics) []*mpb.Metric {
for _, m := range metrics {
o, err := metricTypeToPB(m)
if err != nil {
// TODO: Emit metric when a transformation failure occurs.
goMetrics.IncrCounter(transformFailureMetric, 1)
continue
}
out = append(out, o)
Expand Down
Loading