From 4ea371c8218b9d7d5af54869fec1f6bee1b11abd Mon Sep 17 00:00:00 2001 From: Ashvitha Date: Wed, 24 May 2023 11:49:46 -0400 Subject: [PATCH] [HCP Observability] Init OTELSink in Telemetry (#17162) * Move hcp client to subpackage hcpclient (#16800) * [HCP Observability] New MetricsClient (#17100) * Client configured with TLS using HCP config and retry/throttle * Add tests and godoc for metrics client * close body after request * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * remove clone * Extract CloudConfig and mock for future PR * Switch to hclog.FromContext * [HCP Observability] New MetricsClient (#17100) * Client configured with TLS using HCP config and retry/throttle * Add tests and godoc for metrics client * close body after request * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * remove clone * Extract CloudConfig and mock for future PR * Switch to hclog.FromContext * [HCP Observability] New MetricsClient (#17100) * Client configured with TLS using HCP config and retry/throttle * Add tests and godoc for metrics client * close body after request * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * remove clone * Extract CloudConfig and mock for future PR * Switch to hclog.FromContext * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Create new OTELExporter which uses the MetricsClient Add transform because the conversion is in an /internal package * Fix lint error * early return when there are no metrics * Add NewOTELExporter() function * Downgrade to metrics SDK version: v1.15.0-rc.1 * Fix imports * fix small nits with comments and url.URL * Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile * Cleanup error handling and clarify empty metrics case * Fix input/expected naming in otel_transform_test.go * add comment for metric tracking * Add a general isEmpty method * Add clear error types * update to latest version 1.15.0 of OTEL * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Initialize OTELSink with sync.Map for all the instrument stores. * Moved PeriodicReader init to NewOtelReader function. This allows us to use a ManualReader for tests. * Switch to mutex instead of sync.Map to avoid type assertion * Add gauge store * Clarify comments * return concrete sink type * Fix lint errors * Move gauge store to be within sink * Use context.TODO,rebase and clenaup opts handling * Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1 * Fix imports * Update to latest stable version by rebasing on cc-4933, fix import, remove mutex init, fix opts error messages and use logger from ctx * Add lots of documentation to the OTELSink * Fix gauge store comment and check ok * Add select and ctx.Done() check to gauge callback * use require.Equal for attributes * Fixed import naming * Remove float64 calls and add a NewGaugeStore method * Change name Store to Set in gaugeStore, add concurrency tests in both OTELSink and gauge store * Generate 100 gauge operations * Seperate the labels into goroutines in sink test * Generate kv store for the test case keys to avoid using uuid * Added a race test with 300 samples for OTELSink * [HCP Observability] OTELExporter (#17128) * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Create new OTELExporter which uses the MetricsClient Add transform because the conversion is in an /internal package * Fix lint error * early return when there are no metrics * Add NewOTELExporter() function * Downgrade to metrics SDK version: v1.15.0-rc.1 * Fix imports * fix small nits with comments and url.URL * Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile * Cleanup error handling and clarify empty metrics case * Fix input/expected naming in otel_transform_test.go * add comment for metric tracking * Add a general isEmpty method * Add clear error types * update to latest version 1.15.0 of OTEL * Do not pass in waitgroup and use error channel instead. * Using SHA 7dea2225a218872e86d2f580e82c089b321617b0 to avoid build failures in otel * Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1 * Initialize OTELSink with sync.Map for all the instrument stores. * Added telemetry agent to client and init sink in deps * Fixed client * Initalize sink in deps * init sink in telemetry library * Init deps before telemetry * Use concrete telemetry.OtelSink type * add /v1/metrics * Avoid returning err for telemetry init * move sink init within the IsCloudEnabled() * Use HCPSinkOpts in deps instead * update golden test for configuration file * Switch to using extra sinks in the telemetry library * keep name MetricsConfig * fix log in verifyCCMRegistration * Set logger in context * pass around MetricSink in deps * Fix imports * Rebased onto otel sink pr * Fix URL in test * [HCP Observability] OTELSink (#17159) * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Create new OTELExporter which uses the MetricsClient Add transform because the conversion is in an /internal package * Fix lint error * early return when there are no metrics * Add NewOTELExporter() function * Downgrade to metrics SDK version: v1.15.0-rc.1 * Fix imports * fix small nits with comments and url.URL * Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile * Cleanup error handling and clarify empty metrics case * Fix input/expected naming in otel_transform_test.go * add comment for metric tracking * Add a general isEmpty method * Add clear error types * update to latest version 1.15.0 of OTEL * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Initialize OTELSink with sync.Map for all the instrument stores. * Moved PeriodicReader init to NewOtelReader function. This allows us to use a ManualReader for tests. * Switch to mutex instead of sync.Map to avoid type assertion * Add gauge store * Clarify comments * return concrete sink type * Fix lint errors * Move gauge store to be within sink * Use context.TODO,rebase and clenaup opts handling * Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1 * Fix imports * Update to latest stable version by rebasing on cc-4933, fix import, remove mutex init, fix opts error messages and use logger from ctx * Add lots of documentation to the OTELSink * Fix gauge store comment and check ok * Add select and ctx.Done() check to gauge callback * use require.Equal for attributes * Fixed import naming * Remove float64 calls and add a NewGaugeStore method * Change name Store to Set in gaugeStore, add concurrency tests in both OTELSink and gauge store * Generate 100 gauge operations * Seperate the labels into goroutines in sink test * Generate kv store for the test case keys to avoid using uuid * Added a race test with 300 samples for OTELSink * Do not pass in waitgroup and use error channel instead. * Using SHA 7dea2225a218872e86d2f580e82c089b321617b0 to avoid build failures in otel * Fix nits * pass extraSinks as function param instead * Add default interval as package export * remove verifyCCM func * Add clusterID * Fix import and add t.Parallel() for missing tests * Kick Vercel CI * Remove scheme from endpoint path, and fix error logging * return metrics.MetricSink for sink method * Update SDK --- agent/hcp/client/client.go | 67 +++++++++++++- agent/hcp/client/client_test.go | 75 ++++++++++++++++ agent/hcp/client/mock_Client.go | 81 ++++++++++++++++- agent/hcp/config/config.go | 2 +- agent/hcp/deps.go | 60 +++++++++++++ agent/hcp/deps_test.go | 103 ++++++++++++++++++++++ agent/hcp/telemetry/otel_exporter.go | 12 +-- agent/hcp/telemetry/otel_exporter_test.go | 5 +- agent/hcp/telemetry/otel_sink.go | 6 +- agent/setup.go | 20 +++-- go.mod | 2 +- go.sum | 4 +- lib/telemetry.go | 13 ++- lib/telemetry_test.go | 11 ++- 14 files changed, 427 insertions(+), 34 deletions(-) create mode 100644 agent/hcp/client/client_test.go create mode 100644 agent/hcp/deps_test.go diff --git a/agent/hcp/client/client.go b/agent/hcp/client/client.go index d2ba98a63c1c6..aec1039525aa5 100644 --- a/agent/hcp/client/client.go +++ b/agent/hcp/client/client.go @@ -12,23 +12,44 @@ import ( httptransport "github.com/go-openapi/runtime/client" "github.com/go-openapi/strfmt" - "github.com/hashicorp/consul/agent/hcp/config" - "github.com/hashicorp/consul/version" + hcptelemetry "github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service" hcpgnm "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/client/global_network_manager_service" gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models" "github.com/hashicorp/hcp-sdk-go/httpclient" "github.com/hashicorp/hcp-sdk-go/resource" + + "github.com/hashicorp/consul/agent/hcp/config" + "github.com/hashicorp/consul/version" ) +// metricsGatewayPath is the default path for metrics export request on the Telemetry Gateway. +const metricsGatewayPath = "/v1/metrics" + // Client interface exposes HCP operations that can be invoked by Consul // //go:generate mockery --name Client --with-expecter --inpackage type Client interface { FetchBootstrap(ctx context.Context) (*BootstrapConfig, error) + FetchTelemetryConfig(ctx context.Context) (*TelemetryConfig, error) PushServerStatus(ctx context.Context, status *ServerStatus) error DiscoverServers(ctx context.Context) ([]string, error) } +// MetricsConfig holds metrics specific configuration for the TelemetryConfig. +// The endpoint field overrides the TelemetryConfig endpoint. +type MetricsConfig struct { + Filters []string + Endpoint string +} + +// TelemetryConfig contains configuration for telemetry data forwarded by Consul servers +// to the HCP Telemetry gateway. +type TelemetryConfig struct { + Endpoint string + Labels map[string]string + MetricsConfig *MetricsConfig +} + type BootstrapConfig struct { Name string BootstrapExpect int @@ -44,6 +65,7 @@ type hcpClient struct { hc *httptransport.Runtime cfg config.CloudConfig gnm hcpgnm.ClientService + tgw hcptelemetry.ClientService resource resource.Resource } @@ -64,6 +86,8 @@ func NewClient(cfg config.CloudConfig) (Client, error) { } client.gnm = hcpgnm.New(client.hc, nil) + client.tgw = hcptelemetry.New(client.hc, nil) + return client, nil } @@ -79,6 +103,29 @@ func httpClient(c config.CloudConfig) (*httptransport.Runtime, error) { }) } +// FetchTelemetryConfig obtains telemetry configuration from the Telemetry Gateway. +func (c *hcpClient) FetchTelemetryConfig(ctx context.Context) (*TelemetryConfig, error) { + params := hcptelemetry.NewAgentTelemetryConfigParamsWithContext(ctx). + WithLocationOrganizationID(c.resource.Organization). + WithLocationProjectID(c.resource.Project). + WithClusterID(c.resource.ID) + + resp, err := c.tgw.AgentTelemetryConfig(params, nil) + if err != nil { + return nil, err + } + + payloadConfig := resp.Payload.TelemetryConfig + return &TelemetryConfig{ + Endpoint: payloadConfig.Endpoint, + Labels: payloadConfig.Labels, + MetricsConfig: &MetricsConfig{ + Filters: payloadConfig.Metrics.IncludeList, + Endpoint: payloadConfig.Metrics.Endpoint, + }, + }, nil +} + func (c *hcpClient) FetchBootstrap(ctx context.Context) (*BootstrapConfig, error) { version := version.GetHumanVersion() params := hcpgnm.NewAgentBootstrapConfigParamsWithContext(ctx). @@ -233,3 +280,19 @@ func (c *hcpClient) DiscoverServers(ctx context.Context) ([]string, error) { return servers, nil } + +// Enabled verifies if telemetry is enabled by ensuring a valid endpoint has been retrieved. +// It returns full metrics endpoint and true if a valid endpoint was obtained. +func (t *TelemetryConfig) Enabled() (string, bool) { + endpoint := t.Endpoint + if override := t.MetricsConfig.Endpoint; override != "" { + endpoint = override + } + + if endpoint == "" { + return "", false + } + + // The endpoint from Telemetry Gateway is a domain without scheme, and without the metrics path, so they must be added. + return endpoint + metricsGatewayPath, true +} diff --git a/agent/hcp/client/client_test.go b/agent/hcp/client/client_test.go new file mode 100644 index 0000000000000..43ecf0fd5c45d --- /dev/null +++ b/agent/hcp/client/client_test.go @@ -0,0 +1,75 @@ +package client + +import ( + "context" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestFetchTelemetryConfig(t *testing.T) { + t.Parallel() + for name, test := range map[string]struct { + metricsEndpoint string + expect func(*MockClient) + disabled bool + }{ + "success": { + expect: func(mockClient *MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{ + Endpoint: "https://test.com", + MetricsConfig: &MetricsConfig{ + Endpoint: "", + }, + }, nil) + }, + metricsEndpoint: "https://test.com/v1/metrics", + }, + "overrideMetricsEndpoint": { + expect: func(mockClient *MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{ + Endpoint: "https://test.com", + MetricsConfig: &MetricsConfig{ + Endpoint: "https://test.com", + }, + }, nil) + }, + metricsEndpoint: "https://test.com/v1/metrics", + }, + "disabledWithEmptyEndpoint": { + expect: func(mockClient *MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{ + Endpoint: "", + MetricsConfig: &MetricsConfig{ + Endpoint: "", + }, + }, nil) + }, + disabled: true, + }, + } { + test := test + t.Run(name, func(t *testing.T) { + t.Parallel() + + mock := NewMockClient(t) + test.expect(mock) + + telemetryCfg, err := mock.FetchTelemetryConfig(context.Background()) + require.NoError(t, err) + + if test.disabled { + endpoint, ok := telemetryCfg.Enabled() + require.False(t, ok) + require.Empty(t, endpoint) + return + } + + endpoint, ok := telemetryCfg.Enabled() + + require.True(t, ok) + require.Equal(t, test.metricsEndpoint, endpoint) + }) + } +} diff --git a/agent/hcp/client/mock_Client.go b/agent/hcp/client/mock_Client.go index 27eb35a747cb3..06853ceb86f76 100644 --- a/agent/hcp/client/mock_Client.go +++ b/agent/hcp/client/mock_Client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.15.0. DO NOT EDIT. +// Code generated by mockery v2.22.1. DO NOT EDIT. package client @@ -26,6 +26,10 @@ func (_m *MockClient) DiscoverServers(ctx context.Context) ([]string, error) { ret := _m.Called(ctx) var r0 []string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]string, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) []string); ok { r0 = rf(ctx) } else { @@ -34,7 +38,6 @@ func (_m *MockClient) DiscoverServers(ctx context.Context) ([]string, error) { } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -67,11 +70,20 @@ func (_c *MockClient_DiscoverServers_Call) Return(_a0 []string, _a1 error) *Mock return _c } +func (_c *MockClient_DiscoverServers_Call) RunAndReturn(run func(context.Context) ([]string, error)) *MockClient_DiscoverServers_Call { + _c.Call.Return(run) + return _c +} + // FetchBootstrap provides a mock function with given fields: ctx func (_m *MockClient) FetchBootstrap(ctx context.Context) (*BootstrapConfig, error) { ret := _m.Called(ctx) var r0 *BootstrapConfig + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*BootstrapConfig, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) *BootstrapConfig); ok { r0 = rf(ctx) } else { @@ -80,7 +92,6 @@ func (_m *MockClient) FetchBootstrap(ctx context.Context) (*BootstrapConfig, err } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -113,6 +124,65 @@ func (_c *MockClient_FetchBootstrap_Call) Return(_a0 *BootstrapConfig, _a1 error return _c } +func (_c *MockClient_FetchBootstrap_Call) RunAndReturn(run func(context.Context) (*BootstrapConfig, error)) *MockClient_FetchBootstrap_Call { + _c.Call.Return(run) + return _c +} + +// FetchTelemetryConfig provides a mock function with given fields: ctx +func (_m *MockClient) FetchTelemetryConfig(ctx context.Context) (*TelemetryConfig, error) { + ret := _m.Called(ctx) + + var r0 *TelemetryConfig + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*TelemetryConfig, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *TelemetryConfig); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*TelemetryConfig) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockClient_FetchTelemetryConfig_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FetchTelemetryConfig' +type MockClient_FetchTelemetryConfig_Call struct { + *mock.Call +} + +// FetchTelemetryConfig is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockClient_Expecter) FetchTelemetryConfig(ctx interface{}) *MockClient_FetchTelemetryConfig_Call { + return &MockClient_FetchTelemetryConfig_Call{Call: _e.mock.On("FetchTelemetryConfig", ctx)} +} + +func (_c *MockClient_FetchTelemetryConfig_Call) Run(run func(ctx context.Context)) *MockClient_FetchTelemetryConfig_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockClient_FetchTelemetryConfig_Call) Return(_a0 *TelemetryConfig, _a1 error) *MockClient_FetchTelemetryConfig_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockClient_FetchTelemetryConfig_Call) RunAndReturn(run func(context.Context) (*TelemetryConfig, error)) *MockClient_FetchTelemetryConfig_Call { + _c.Call.Return(run) + return _c +} + // PushServerStatus provides a mock function with given fields: ctx, status func (_m *MockClient) PushServerStatus(ctx context.Context, status *ServerStatus) error { ret := _m.Called(ctx, status) @@ -151,6 +221,11 @@ func (_c *MockClient_PushServerStatus_Call) Return(_a0 error) *MockClient_PushSe return _c } +func (_c *MockClient_PushServerStatus_Call) RunAndReturn(run func(context.Context, *ServerStatus) error) *MockClient_PushServerStatus_Call { + _c.Call.Return(run) + return _c +} + type mockConstructorTestingTNewMockClient interface { mock.TestingT Cleanup(func()) diff --git a/agent/hcp/config/config.go b/agent/hcp/config/config.go index a6d4c31979db4..cf87b685e352a 100644 --- a/agent/hcp/config/config.go +++ b/agent/hcp/config/config.go @@ -46,6 +46,6 @@ func (c *CloudConfig) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfi if c.ScadaAddress != "" { opts = append(opts, hcpcfg.WithSCADA(c.ScadaAddress, c.TLSConfig)) } - opts = append(opts, hcpcfg.FromEnv()) + opts = append(opts, hcpcfg.FromEnv(), hcpcfg.WithoutBrowserLogin()) return hcpcfg.NewHCPConfig(opts...) } diff --git a/agent/hcp/deps.go b/agent/hcp/deps.go index b4d67154fb45f..702f7d27a396b 100644 --- a/agent/hcp/deps.go +++ b/agent/hcp/deps.go @@ -4,9 +4,15 @@ package hcp import ( + "context" + "net/url" + "time" + + "github.com/armon/go-metrics" hcpclient "github.com/hashicorp/consul/agent/hcp/client" "github.com/hashicorp/consul/agent/hcp/config" "github.com/hashicorp/consul/agent/hcp/scada" + "github.com/hashicorp/consul/agent/hcp/telemetry" "github.com/hashicorp/go-hclog" ) @@ -14,6 +20,7 @@ import ( type Deps struct { Client hcpclient.Client Provider scada.Provider + Sink metrics.MetricSink } func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (d Deps, err error) { @@ -23,5 +30,58 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (d Deps, err error) { } d.Provider, err = scada.New(cfg, logger.Named("hcp.scada")) + if err != nil { + return + } + + d.Sink = sink(d.Client, &cfg, logger) + return } + +// sink provides initializes an OTELSink which forwards Consul metrics to HCP. +// The sink is only initialized if the server is registered with the management plane (CCM). +// This step should not block server initialization, so errors are logged, but not returned. +func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Logger) metrics.MetricSink { + ctx := context.Background() + ctx = hclog.WithContext(ctx, logger) + + reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + telemetryCfg, err := hcpClient.FetchTelemetryConfig(reqCtx) + if err != nil { + logger.Error("failed to fetch telemetry config", "error", err) + return nil + } + + endpoint, isEnabled := telemetryCfg.Enabled() + if !isEnabled { + return nil + } + + u, err := url.Parse(endpoint) + if err != nil { + logger.Error("failed to parse url endpoint", "error", err) + return nil + } + + metricsClient, err := hcpclient.NewMetricsClient(cfg, ctx) + if err != nil { + logger.Error("failed to init metrics client", "error", err) + return nil + } + + sinkOpts := &telemetry.OTELSinkOpts{ + Ctx: ctx, + Reader: telemetry.NewOTELReader(metricsClient, u, telemetry.DefaultExportInterval), + } + + sink, err := telemetry.NewOTELSink(sinkOpts) + if err != nil { + logger.Error("failed to init OTEL sink", "error", err) + return nil + } + + return sink +} diff --git a/agent/hcp/deps_test.go b/agent/hcp/deps_test.go new file mode 100644 index 0000000000000..017d7c791c5e2 --- /dev/null +++ b/agent/hcp/deps_test.go @@ -0,0 +1,103 @@ +package hcp + +import ( + "fmt" + "testing" + + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/hcp/client" +) + +func TestSink(t *testing.T) { + t.Parallel() + for name, test := range map[string]struct { + expect func(*client.MockClient) + mockCloudCfg client.CloudConfig + expectedSink bool + }{ + "success": { + expect: func(mockClient *client.MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ + Endpoint: "https://test.com", + MetricsConfig: &client.MetricsConfig{ + Endpoint: "https://test.com", + }, + }, nil) + }, + mockCloudCfg: client.MockCloudCfg{}, + expectedSink: true, + }, + "noSinkWhenServerNotRegisteredWithCCM": { + expect: func(mockClient *client.MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ + Endpoint: "", + MetricsConfig: &client.MetricsConfig{ + Endpoint: "", + }, + }, nil) + }, + mockCloudCfg: client.MockCloudCfg{}, + }, + "noSinkWhenCCMVerificationFails": { + expect: func(mockClient *client.MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("fetch failed")) + }, + mockCloudCfg: client.MockCloudCfg{}, + }, + "noSinkWhenMetricsClientInitFails": { + expect: func(mockClient *client.MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ + Endpoint: "https://test.com", + MetricsConfig: &client.MetricsConfig{ + Endpoint: "", + }, + }, nil) + }, + mockCloudCfg: client.MockErrCloudCfg{}, + }, + "failsWithFetchTelemetryFailure": { + expect: func(mockClient *client.MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("FetchTelemetryConfig error")) + }, + }, + "failsWithURLParseErr": { + expect: func(mockClient *client.MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ + // Minimum 2 chars for a domain to be valid. + Endpoint: "s", + MetricsConfig: &client.MetricsConfig{ + // Invalid domain chars + Endpoint: " ", + }, + }, nil) + }, + }, + "noErrWithEmptyEndpoint": { + expect: func(mockClient *client.MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ + Endpoint: "", + MetricsConfig: &client.MetricsConfig{ + Endpoint: "", + }, + }, nil) + }, + }, + } { + test := test + t.Run(name, func(t *testing.T) { + t.Parallel() + c := client.NewMockClient(t) + l := hclog.NewNullLogger() + test.expect(c) + sinkOpts := sink(c, test.mockCloudCfg, l) + if !test.expectedSink { + require.Nil(t, sinkOpts) + return + } + require.NotNil(t, sinkOpts) + }) + } +} diff --git a/agent/hcp/telemetry/otel_exporter.go b/agent/hcp/telemetry/otel_exporter.go index 02f6a07f83715..2512706f5353c 100644 --- a/agent/hcp/telemetry/otel_exporter.go +++ b/agent/hcp/telemetry/otel_exporter.go @@ -15,15 +15,15 @@ import ( // The exporter is used by a OTEL Metrics SDK PeriodicReader to export aggregated metrics. // This allows us to use a custom client - HCP authenticated MetricsClient. type OTELExporter struct { - client hcpclient.MetricsClient - url url.URL + client hcpclient.MetricsClient + endpoint *url.URL } // NewOTELExporter returns a configured OTELExporter -func NewOTELExporter(client hcpclient.MetricsClient, url url.URL) *OTELExporter { +func NewOTELExporter(client hcpclient.MetricsClient, endpoint *url.URL) *OTELExporter { return &OTELExporter{ - client: client, - url: url, + client: client, + endpoint: endpoint, } } @@ -56,7 +56,7 @@ func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceM if isEmpty(otlpMetrics) { return nil } - return e.client.ExportMetrics(ctx, otlpMetrics, e.url.String()) + return e.client.ExportMetrics(ctx, otlpMetrics, e.endpoint.String()) } // ForceFlush is a no-op, as the MetricsClient client holds no state. diff --git a/agent/hcp/telemetry/otel_exporter_test.go b/agent/hcp/telemetry/otel_exporter_test.go index 0e3d3fcc1e4f6..72e6b84d242c6 100644 --- a/agent/hcp/telemetry/otel_exporter_test.go +++ b/agent/hcp/telemetry/otel_exporter_test.go @@ -3,6 +3,7 @@ package telemetry import ( "context" "fmt" + "net/url" "testing" "github.com/stretchr/testify/require" @@ -96,9 +97,7 @@ func TestExport(t *testing.T) { test := test t.Run(name, func(t *testing.T) { t.Parallel() - exp := &OTELExporter{ - client: test.client, - } + exp := NewOTELExporter(test.client, &url.URL{}) err := exp.Export(context.Background(), test.metrics) if test.wantErr != "" { diff --git a/agent/hcp/telemetry/otel_sink.go b/agent/hcp/telemetry/otel_sink.go index ec5e2d476b1a8..9a984150b773d 100644 --- a/agent/hcp/telemetry/otel_sink.go +++ b/agent/hcp/telemetry/otel_sink.go @@ -19,6 +19,10 @@ import ( "github.com/hashicorp/consul/agent/hcp/client" ) +// DefaultExportInterval is a default time interval between export of aggregated metrics. +const DefaultExportInterval = 10 * time.Second + +// OTELSinkOpts is used to provide configuration when initializing an OTELSink using NewOTELSink. type OTELSinkOpts struct { Reader otelsdk.Reader Ctx context.Context @@ -60,7 +64,7 @@ type OTELSink struct { // NewOTELReader returns a configured OTEL PeriodicReader to export metrics every X seconds. // It configures the reader with a custom OTELExporter with a MetricsClient to transform and export // metrics in OTLP format to an external url. -func NewOTELReader(client client.MetricsClient, url url.URL, exportInterval time.Duration) otelsdk.Reader { +func NewOTELReader(client client.MetricsClient, url *url.URL, exportInterval time.Duration) otelsdk.Reader { exporter := NewOTELExporter(client, url) return otelsdk.NewPeriodicReader(exporter, otelsdk.WithInterval(exportInterval)) } diff --git a/agent/setup.go b/agent/setup.go index a4520e3cfcbd8..46e60d58b2667 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/go-hclog" wal "github.com/hashicorp/raft-wal" @@ -101,7 +102,18 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hcl cfg.Telemetry.PrometheusOpts.CounterDefinitions = counters cfg.Telemetry.PrometheusOpts.SummaryDefinitions = summaries - d.MetricsConfig, err = lib.InitTelemetry(cfg.Telemetry, d.Logger) + var extraSinks []metrics.MetricSink + if cfg.IsCloudEnabled() { + d.HCP, err = hcp.NewDeps(cfg.Cloud, d.Logger) + if err != nil { + return d, err + } + if d.HCP.Sink != nil { + extraSinks = append(extraSinks, d.HCP.Sink) + } + } + + d.MetricsConfig, err = lib.InitTelemetry(cfg.Telemetry, d.Logger, extraSinks...) if err != nil { return d, fmt.Errorf("failed to initialize telemetry: %w", err) } @@ -192,12 +204,6 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hcl d.EventPublisher = stream.NewEventPublisher(10 * time.Second) d.XDSStreamLimiter = limiter.NewSessionLimiter() - if cfg.IsCloudEnabled() { - d.HCP, err = hcp.NewDeps(cfg.Cloud, d.Logger) - if err != nil { - return d, err - } - } return d, nil } diff --git a/go.mod b/go.mod index f0bc0826dfea5..e4e32cea37329 100644 --- a/go.mod +++ b/go.mod @@ -62,7 +62,7 @@ require ( github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/hcl v1.0.0 github.com/hashicorp/hcp-scada-provider v0.2.3 - github.com/hashicorp/hcp-sdk-go v0.44.1-0.20230508124639-28da4c5b03f3 + github.com/hashicorp/hcp-sdk-go v0.46.1-0.20230519164650-51657675d9e7 github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038 github.com/hashicorp/memberlist v0.5.0 github.com/hashicorp/raft v1.5.0 diff --git a/go.sum b/go.sum index 91a396da17b97..bffefaaf6aab5 100644 --- a/go.sum +++ b/go.sum @@ -610,8 +610,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/hcp-scada-provider v0.2.3 h1:AarYR+/Pcv+cMvPdAlb92uOBmZfEH6ny4+DT+4NY2VQ= github.com/hashicorp/hcp-scada-provider v0.2.3/go.mod h1:ZFTgGwkzNv99PLQjTsulzaCplCzOTBh0IUQsPKzrQFo= -github.com/hashicorp/hcp-sdk-go v0.44.1-0.20230508124639-28da4c5b03f3 h1:9QstZdsLIS6iPyYxQoyymRz8nBw9jMdEbGy29gtgzVQ= -github.com/hashicorp/hcp-sdk-go v0.44.1-0.20230508124639-28da4c5b03f3/go.mod h1:hZqky4HEzsKwvLOt4QJlZUrjeQmb4UCZUhDP2HyQFfc= +github.com/hashicorp/hcp-sdk-go v0.46.1-0.20230519164650-51657675d9e7 h1:/7/5kyyCT5tCeRanKIJAfP8Z6JnjEV55PNuI6phn2k0= +github.com/hashicorp/hcp-sdk-go v0.46.1-0.20230519164650-51657675d9e7/go.mod h1:hZqky4HEzsKwvLOt4QJlZUrjeQmb4UCZUhDP2HyQFfc= github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038 h1:n9J0rwVWXDpNd5iZnwY7w4WZyq53/rROeI7OVvLW8Ok= github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038/go.mod h1:n2TSygSNwsLJ76m8qFXTSc7beTb+auJxYdqrnoqwZWE= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= diff --git a/lib/telemetry.go b/lib/telemetry.go index f6af9739243ca..2d87707c33181 100644 --- a/lib/telemetry.go +++ b/lib/telemetry.go @@ -324,7 +324,7 @@ func circonusSink(cfg TelemetryConfig, _ string) (metrics.MetricSink, error) { return sink, nil } -func configureSinks(cfg TelemetryConfig, memSink metrics.MetricSink) (metrics.FanoutSink, error) { +func configureSinks(cfg TelemetryConfig, memSink metrics.MetricSink, extraSinks []metrics.MetricSink) (metrics.FanoutSink, error) { metricsConf := metrics.DefaultConfig(cfg.MetricsPrefix) metricsConf.EnableHostname = !cfg.DisableHostname metricsConf.FilterDefault = cfg.FilterDefault @@ -349,6 +349,11 @@ func configureSinks(cfg TelemetryConfig, memSink metrics.MetricSink) (metrics.Fa addSink(dogstatdSink) addSink(circonusSink) addSink(prometheusSink) + for _, sink := range extraSinks { + if sink != nil { + sinks = append(sinks, sink) + } + } if len(sinks) > 0 { sinks = append(sinks, memSink) @@ -364,7 +369,7 @@ func configureSinks(cfg TelemetryConfig, memSink metrics.MetricSink) (metrics.Fa // values as returned by Runtimecfg.Config(). // InitTelemetry retries configurating the sinks in case error is retriable // and retry_failed_connection is set to true. -func InitTelemetry(cfg TelemetryConfig, logger hclog.Logger) (*MetricsConfig, error) { +func InitTelemetry(cfg TelemetryConfig, logger hclog.Logger, extraSinks ...metrics.MetricSink) (*MetricsConfig, error) { if cfg.Disable { return nil, nil } @@ -384,7 +389,7 @@ func InitTelemetry(cfg TelemetryConfig, logger hclog.Logger) (*MetricsConfig, er } for { logger.Warn("retrying configure metric sinks", "retries", waiter.Failures()) - _, err := configureSinks(cfg, memSink) + _, err := configureSinks(cfg, memSink, extraSinks) if err == nil { logger.Info("successfully configured metrics sinks") return @@ -397,7 +402,7 @@ func InitTelemetry(cfg TelemetryConfig, logger hclog.Logger) (*MetricsConfig, er } } - if _, errs := configureSinks(cfg, memSink); errs != nil { + if _, errs := configureSinks(cfg, memSink, extraSinks); errs != nil { if isRetriableError(errs) && cfg.RetryFailedConfiguration { logger.Warn("failed configure sinks", "error", multierror.Flatten(errs)) ctx, cancel = context.WithCancel(context.Background()) diff --git a/lib/telemetry_test.go b/lib/telemetry_test.go index c8649f0fd7644..a2c0075598ecc 100644 --- a/lib/telemetry_test.go +++ b/lib/telemetry_test.go @@ -10,6 +10,8 @@ import ( "testing" "github.com/hashicorp/consul/logging" + + "github.com/armon/go-metrics" "github.com/hashicorp/go-multierror" "github.com/stretchr/testify/require" ) @@ -24,15 +26,16 @@ func newCfg() TelemetryConfig { func TestConfigureSinks(t *testing.T) { cfg := newCfg() - sinks, err := configureSinks(cfg, nil) + extraSinks := []metrics.MetricSink{&metrics.BlackholeSink{}} + sinks, err := configureSinks(cfg, nil, extraSinks) require.Error(t, err) - // 3 sinks: statsd, statsite, inmem - require.Equal(t, 3, len(sinks)) + // 4 sinks: statsd, statsite, inmem, extra sink (blackhole) + require.Equal(t, 4, len(sinks)) cfg = TelemetryConfig{ DogstatsdAddr: "", } - _, err = configureSinks(cfg, nil) + _, err = configureSinks(cfg, nil, nil) require.NoError(t, err) }