-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HCP Observability] Add custom metrics for OTEL sink, improve logging, upgrade modules and cleanup metrics client #17455
Changes from 10 commits
55109e5
f6b3a9d
1b2500f
01338ee
64899a4
46e0918
ca89326
0305ed3
dd2f7a8
32b1525
b24ab3e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,8 @@ import ( | |
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" | ||
"golang.org/x/oauth2" | ||
"google.golang.org/protobuf/proto" | ||
|
||
"github.com/hashicorp/consul/version" | ||
) | ||
|
||
const ( | ||
|
@@ -72,8 +74,9 @@ func NewMetricsClient(cfg CloudConfig, ctx context.Context) (MetricsClient, erro | |
} | ||
|
||
header := make(http.Header) | ||
header.Set("Content-Type", "application/x-protobuf") | ||
header.Set("content-type", "application/x-protobuf") | ||
header.Set("x-hcp-resource-id", r.String()) | ||
header.Set("x-channel", fmt.Sprintf("consul/%s", version.GetHumanVersion())) | ||
|
||
return &otlpClient{ | ||
client: c, | ||
|
@@ -124,36 +127,28 @@ func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.R | |
|
||
body, err := proto.Marshal(pbRequest) | ||
if err != nil { | ||
return fmt.Errorf("failed to export metrics: %v", err) | ||
return fmt.Errorf("failed to marshal the request: %w", err) | ||
} | ||
|
||
req, err := retryablehttp.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(body)) | ||
if err != nil { | ||
return fmt.Errorf("failed to export metrics: %v", err) | ||
return fmt.Errorf("failed to create request: %w", err) | ||
} | ||
req.Header = *o.header | ||
|
||
resp, err := o.client.Do(req.WithContext(ctx)) | ||
if err != nil { | ||
return fmt.Errorf("failed to export metrics: %v", err) | ||
return fmt.Errorf("failed to post metrics: %w", err) | ||
} | ||
defer resp.Body.Close() | ||
|
||
var respData bytes.Buffer | ||
if _, err := io.Copy(&respData, resp.Body); err != nil { | ||
return fmt.Errorf("failed to export metrics: %v", err) | ||
return fmt.Errorf("failed to read body: %w", err) | ||
} | ||
|
||
if respData.Len() != 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FIX: Check status code and remove useless logic in MetricsClient When testing the export metrics, I hit a case where I would get a 404: not found because the endpoint wasn't hooked up. This made the code continue and try to deserialize an object that wasn't there. So I add check to see if the status isn't 200: OK. The |
||
var respProto colmetricpb.ExportMetricsServiceResponse | ||
if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil { | ||
return fmt.Errorf("failed to export metrics: %v", err) | ||
} | ||
|
||
if respProto.PartialSuccess != nil { | ||
msg := respProto.PartialSuccess.GetErrorMessage() | ||
return fmt.Errorf("failed to export metrics: partial success: %s", msg) | ||
} | ||
if resp.StatusCode != http.StatusOK { | ||
return fmt.Errorf("failed to export metrics: code %d: %s", resp.StatusCode, string(body)) | ||
} | ||
|
||
return nil | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMPROVEMENT: Cleanup
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package telemetry | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keys for custom metrics for the operations of the sink. These metrics do not need to be registered in consul as a whole (e.g. for prometheus) as they will only exist when the HCP sink is created, so only the OTELSink / Inmemsink will be receiving these metrics when the OTELSink is running. These metrics will be used to monitor the sink, and potentially also in E2E tests to ensure metrics are exported correctly. Is a separate file overkill? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Separate file is ok. I would change the naming slightly to be more specific. I would also prefix with a generic prefix (something like var (
internalMetricTransformFailure = []string{"hcp", "otel", "transform", "failure"}
internalMetricExportSuccess = []string{"hcp", "otel", "exporter", "export", "sucess"}
internalMetricExportFailure = []string{"hcp", "otel", "exporter", "export", "failure"}
internalMetricExporterShutdown = []string{"hcp", "otel", "exporter", "shutdown"}
internalMetricExporterForceFlush = []string{"hcp", "otel", "exporter", "force_flush"}
) |
||
|
||
// Keys for custom Go Metrics metrics emitted only for the OTEL | ||
// export (exporter.go) and transform (transform.go) failures and successes. | ||
// These enable us to monitor OTEL operations. | ||
var ( | ||
transformFailureMetric []string = []string{"hcp", "otel", "transform", "failure"} | ||
|
||
exportSuccessMetric []string = []string{"hcp", "otel", "exporter", "export", "sucess"} | ||
exportFailureMetric []string = []string{"hcp", "otel", "exporter", "export", "failure"} | ||
|
||
exporterShutdownMetric []string = []string{"hcp", "otel", "exporter", "shutdown"} | ||
exporterForceFlushMetric []string = []string{"hcp", "otel", "exporter", "force_flush"} | ||
) |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMPROVEMENT: Address TODO's to instrument Exporter operations in this file Send a |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,8 +4,11 @@ import ( | |
"context" | ||
"fmt" | ||
"net/url" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
||
"github.com/armon/go-metrics" | ||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/otel/sdk/metric" | ||
"go.opentelemetry.io/otel/sdk/metric/aggregation" | ||
|
@@ -16,6 +19,14 @@ import ( | |
"github.com/hashicorp/consul/agent/hcp/client" | ||
) | ||
|
||
type mockMetricsClient struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just moved this to the top... no changes. |
||
exportErr error | ||
} | ||
|
||
func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error { | ||
return m.exportErr | ||
} | ||
|
||
func TestTemporality(t *testing.T) { | ||
t.Parallel() | ||
exp := &OTELExporter{} | ||
|
@@ -50,14 +61,6 @@ func TestAggregation(t *testing.T) { | |
} | ||
} | ||
|
||
type mockMetricsClient struct { | ||
exportErr error | ||
} | ||
|
||
func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error { | ||
return m.exportErr | ||
} | ||
|
||
func TestExport(t *testing.T) { | ||
t.Parallel() | ||
for name, test := range map[string]struct { | ||
|
@@ -111,6 +114,72 @@ func TestExport(t *testing.T) { | |
} | ||
} | ||
|
||
// TestExport_CustomMetrics tests that a custom metric (hcp.otel.exporter.*) is emitted | ||
// for exporter operations. This test cannot be run in parallel as the metrics.NewGlobal() | ||
// sets a shared global sink. | ||
func TestExport_CustomMetrics(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test for the new exporter metrics metric, which makes a call to export/shutdown/force_flush and checks that the metric is in the inmemsink. |
||
for name, tc := range map[string]struct { | ||
client client.MetricsClient | ||
metricKey []string | ||
operation string | ||
}{ | ||
"exportSuccessEmitsCustomMetric": { | ||
client: &mockMetricsClient{}, | ||
metricKey: exportSuccessMetric, | ||
operation: "export", | ||
}, | ||
"exportFailureEmitsCustomMetric": { | ||
client: &mockMetricsClient{ | ||
exportErr: fmt.Errorf("client err"), | ||
}, | ||
metricKey: exportFailureMetric, | ||
operation: "export", | ||
}, | ||
"shutdownEmitsCustomMetric": { | ||
metricKey: exporterShutdownMetric, | ||
operation: "shutdown", | ||
}, | ||
"forceFlushEmitsCustomMetric": { | ||
metricKey: exporterForceFlushMetric, | ||
operation: "flush", | ||
}, | ||
} { | ||
t.Run(name, func(t *testing.T) { | ||
// Init global sink. | ||
serviceName := "test.transform" | ||
cfg := metrics.DefaultConfig(serviceName) | ||
cfg.EnableHostname = false | ||
|
||
sink := metrics.NewInmemSink(10*time.Second, 10*time.Second) | ||
metrics.NewGlobal(cfg, sink) | ||
|
||
// Perform operation that emits metric. | ||
exp := NewOTELExporter(tc.client, &url.URL{}) | ||
|
||
ctx := context.Background() | ||
switch tc.operation { | ||
case "flush": | ||
exp.ForceFlush(ctx) | ||
case "shutdown": | ||
exp.Shutdown(ctx) | ||
default: | ||
exp.Export(ctx, inputResourceMetrics) | ||
} | ||
|
||
// Collect sink metrics. | ||
intervals := sink.Data() | ||
require.Len(t, intervals, 1) | ||
key := serviceName + "." + strings.Join(tc.metricKey, ".") | ||
sv := intervals[0].Counters[key] | ||
|
||
// Verify count for transform failure metric. | ||
require.NotNil(t, sv) | ||
require.NotNil(t, sv.AggregateSample) | ||
require.Equal(t, 1, sv.AggregateSample.Count) | ||
}) | ||
} | ||
} | ||
|
||
func TestForceFlush(t *testing.T) { | ||
t.Parallel() | ||
exp := &OTELExporter{} | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMPROVEMENT: Address TODO to instrument transform operations in this file Send a `"hcp.otel.transform.failure" for transform failure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lowercased to be consistent with other ones.