diff --git a/agent/hcp/deps.go b/agent/hcp/deps.go index 86663ec5d203..0df972613f0c 100644 --- a/agent/hcp/deps.go +++ b/agent/hcp/deps.go @@ -77,19 +77,19 @@ func sink( return nil, nil } - cfgProvider, err := NewTelemetryConfigProvider(&TelemetryConfigProviderOpts{ - Ctx: ctx, - MetricsConfig: telemetryCfg.MetricsConfig, - HCPClient: hcpClient, - RefreshInterval: telemetryCfg.RefreshConfig.RefreshInterval, + cfgProvider, err := NewHCPProviderImpl(ctx, &providerParams{ + metricsConfig: telemetryCfg.MetricsConfig, + hcpClient: hcpClient, + refreshInterval: telemetryCfg.RefreshConfig.RefreshInterval, }) if err != nil { return nil, fmt.Errorf("failed to init config provider: %w", err) } + reader := telemetry.NewOTELReader(metricsClient, cfgProvider, telemetry.DefaultExportInterval) sinkOpts := &telemetry.OTELSinkOpts{ Ctx: ctx, - Reader: telemetry.NewOTELReader(metricsClient, cfgProvider, telemetry.DefaultExportInterval), + Reader: reader, ConfigProvider: cfgProvider, } diff --git a/agent/hcp/telemetry_config_provider.go b/agent/hcp/telemetry_config_provider.go index d98cfd951a9f..3b6d5b3a0e3b 100644 --- a/agent/hcp/telemetry_config_provider.go +++ b/agent/hcp/telemetry_config_provider.go @@ -13,6 +13,7 @@ import ( "github.com/mitchellh/hashstructure/v2" hcpclient "github.com/hashicorp/consul/agent/hcp/client" + hcpTelemetry "github.com/hashicorp/consul/agent/hcp/telemetry" ) var ( @@ -20,12 +21,15 @@ var ( internalMetricRefreshFailure []string = []string{"hcp", "telemetry_config_provider", "refresh", "failure"} ) -// TelemetryConfigProviderOpts is used to initialize a telemetryConfigProvider. -type TelemetryConfigProviderOpts struct { - Ctx context.Context - MetricsConfig *hcpclient.MetricsConfig - RefreshInterval time.Duration - HCPClient hcpclient.Client +// Ensure implementation of the telemetry provider interfaces. +var _ hcpTelemetry.ConfigProvider = &hcpProviderImpl{} +var _ hcpTelemetry.EndpointProvider = &hcpProviderImpl{} + +// providerParams is used to initialize a hcpProviderImpl. +type providerParams struct { + metricsConfig *hcpclient.MetricsConfig + refreshInterval time.Duration + hcpClient hcpclient.Client } // dynamicConfig is a set of configurable settings for metrics collection, processing and export. @@ -37,43 +41,39 @@ type dynamicConfig struct { refreshInterval time.Duration } -// telemetryConfigProvider holds metrics configuration and settings for continuous fetch of new config from HCP. -type telemetryConfigProvider struct { - // telemetryConfig holds configuration that can be dynamically updated +// hcpProviderImpl holds metrics configuration and settings for continuous fetch of new config from HCP. +type hcpProviderImpl struct { + // cfg holds configuration that can be dynamically updated // based on updates fetched from HCP. cfg *dynamicConfig - // telemetryConfigHash is used to compare two telemetryConfig objects to see if they are the same. + // cfgHash is used to compare two telemetryConfig objects to see if they are the same. cfgHash uint64 // a reader-writer mutex is used as the provider is read heavy, as the OTEL components // access telemetryConfig, while config is only updated (write) when there are changes. rw sync.RWMutex - logger hclog.Logger hcpClient hcpclient.Client } -func NewTelemetryConfigProvider(opts *TelemetryConfigProviderOpts) (*telemetryConfigProvider, error) { - if opts.Ctx == nil { - return nil, fmt.Errorf("missing ctx") - } - - if opts.HCPClient == nil { +func NewHCPProviderImpl(ctx context.Context, params *providerParams) (*hcpProviderImpl, error) { + if params.hcpClient == nil { return nil, fmt.Errorf("missing HCP client") } - if opts.MetricsConfig == nil { + if params.metricsConfig == nil { return nil, fmt.Errorf("missing metrics config") } - if opts.RefreshInterval <= 0 { + // TODO: should this be 0, to disable refresh? + if params.refreshInterval <= 0 { return nil, fmt.Errorf("invalid refresh interval") } cfg := &dynamicConfig{ - endpoint: opts.MetricsConfig.Endpoint, - labels: opts.MetricsConfig.Labels, - filters: opts.MetricsConfig.Filters, - refreshInterval: opts.RefreshInterval, + endpoint: params.metricsConfig.Endpoint, + labels: params.metricsConfig.Labels, + filters: params.metricsConfig.Filters, + refreshInterval: params.refreshInterval, } hash, err := calculateHash(cfg) @@ -81,22 +81,21 @@ func NewTelemetryConfigProvider(opts *TelemetryConfigProviderOpts) (*telemetryCo return nil, fmt.Errorf("failed to calculate hash: %w", err) } - t := &telemetryConfigProvider{ + t := &hcpProviderImpl{ cfg: cfg, cfgHash: hash, - logger: hclog.FromContext(opts.Ctx).Named("telemetry_config_provider"), - hcpClient: opts.HCPClient, + hcpClient: params.hcpClient, } - go t.run(opts.Ctx, opts.RefreshInterval) + go t.run(ctx) return t, nil } // run continously checks for updates to the telemetry configuration by making a request to HCP. // Modification of config only occurs if changes are detected to decrease write locks that block read locks. -func (t *telemetryConfigProvider) run(ctx context.Context, refreshInterval time.Duration) { - ticker := time.NewTicker(refreshInterval) +func (t *hcpProviderImpl) run(ctx context.Context) { + ticker := time.NewTicker(t.cfg.refreshInterval) defer ticker.Stop() for { select { @@ -113,16 +112,17 @@ func (t *telemetryConfigProvider) run(ctx context.Context, refreshInterval time. // checkUpdate makes a HTTP request to HCP to return a new metrics configuration and true, if config changed. // checkUpdate does not update the metricsConfig field to prevent acquiring the write lock unnecessarily. -func (t *telemetryConfigProvider) checkUpdate(ctx context.Context) (*dynamicConfig, bool) { +func (t *hcpProviderImpl) checkUpdate(ctx context.Context) (*dynamicConfig, bool) { t.rw.RLock() defer t.rw.RUnlock() + logger := hclog.FromContext(ctx).Named("telemetry_config_provider") ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() telemetryCfg, err := t.hcpClient.FetchTelemetryConfig(ctx) if err != nil { - t.logger.Error("failed to fetch telemetry config from HCP", "error", err) + logger.Error("failed to fetch telemetry config from HCP", "error", err) goMetrics.IncrCounter(internalMetricRefreshFailure, 1) return nil, false } @@ -136,7 +136,7 @@ func (t *telemetryConfigProvider) checkUpdate(ctx context.Context) (*dynamicConf newHash, err := calculateHash(newDynamicConfig) if err != nil { - t.logger.Error("failed to calculate hash for new config", "error", err) + logger.Error("failed to calculate hash for new config", "error", err) goMetrics.IncrCounter(internalMetricRefreshFailure, 1) return nil, false } @@ -145,7 +145,7 @@ func (t *telemetryConfigProvider) checkUpdate(ctx context.Context) (*dynamicConf } // modifynewTelemetryConfig acquires a write lock to modify it with a given newTelemetryConfig object. -func (t *telemetryConfigProvider) modifyTelemetryConfig(newCfg *dynamicConfig) { +func (t *hcpProviderImpl) modifyTelemetryConfig(newCfg *dynamicConfig) { t.rw.Lock() defer t.rw.Unlock() @@ -153,7 +153,7 @@ func (t *telemetryConfigProvider) modifyTelemetryConfig(newCfg *dynamicConfig) { } // GetEndpoint acquires a read lock to return endpoint configuration for consumers. -func (t *telemetryConfigProvider) GetEndpoint() *url.URL { +func (t *hcpProviderImpl) GetEndpoint() *url.URL { t.rw.RLock() defer t.rw.RUnlock() @@ -161,7 +161,7 @@ func (t *telemetryConfigProvider) GetEndpoint() *url.URL { } // GetFilters acquires a read lock to return filters configuration for consumers. -func (t *telemetryConfigProvider) GetFilters() *regexp.Regexp { +func (t *hcpProviderImpl) GetFilters() *regexp.Regexp { t.rw.RLock() defer t.rw.RUnlock() @@ -169,7 +169,7 @@ func (t *telemetryConfigProvider) GetFilters() *regexp.Regexp { } // GetLabels acquires a read lock to return labels configuration for consumers. -func (t *telemetryConfigProvider) GetLabels() map[string]string { +func (t *hcpProviderImpl) GetLabels() map[string]string { t.rw.RLock() defer t.rw.RUnlock() diff --git a/agent/hcp/telemetry_config_provider_test.go b/agent/hcp/telemetry_config_provider_test.go index ce4d7004e64b..8ea4d751070a 100644 --- a/agent/hcp/telemetry_config_provider_test.go +++ b/agent/hcp/telemetry_config_provider_test.go @@ -28,44 +28,33 @@ type testConfig struct { func TestNewTelemetryConfigProvider(t *testing.T) { t.Parallel() for name, tc := range map[string]struct { - opts *TelemetryConfigProviderOpts + opts *providerParams wantErr string }{ "success": { - opts: &TelemetryConfigProviderOpts{ - Ctx: context.Background(), - HCPClient: hcpclient.NewMockClient(t), - MetricsConfig: &hcpclient.MetricsConfig{}, - RefreshInterval: 1 * time.Second, + opts: &providerParams{ + hcpClient: hcpclient.NewMockClient(t), + metricsConfig: &hcpclient.MetricsConfig{}, + refreshInterval: 1 * time.Second, }, }, - "failsWithMissingContext": { - opts: &TelemetryConfigProviderOpts{ - HCPClient: hcpclient.NewMockClient(t), - MetricsConfig: &hcpclient.MetricsConfig{}, - }, - wantErr: "missing ctx", - }, "failsWithMissingHCPClient": { - opts: &TelemetryConfigProviderOpts{ - Ctx: context.Background(), - MetricsConfig: &hcpclient.MetricsConfig{}, + opts: &providerParams{ + metricsConfig: &hcpclient.MetricsConfig{}, }, wantErr: "missing HCP client", }, "failsWithMissingMetricsConfig": { - opts: &TelemetryConfigProviderOpts{ - Ctx: context.Background(), - HCPClient: hcpclient.NewMockClient(t), + opts: &providerParams{ + hcpClient: hcpclient.NewMockClient(t), }, wantErr: "missing metrics config", }, "failsWithInvalidRefreshInterval": { - opts: &TelemetryConfigProviderOpts{ - Ctx: context.Background(), - HCPClient: hcpclient.NewMockClient(t), - MetricsConfig: &hcpclient.MetricsConfig{}, - RefreshInterval: 0 * time.Second, + opts: &providerParams{ + hcpClient: hcpclient.NewMockClient(t), + metricsConfig: &hcpclient.MetricsConfig{}, + refreshInterval: 0 * time.Second, }, wantErr: "invalid refresh interval", }, @@ -73,7 +62,9 @@ func TestNewTelemetryConfigProvider(t *testing.T) { tc := tc t.Run(name, func(t *testing.T) { t.Parallel() - cfgProvider, err := NewTelemetryConfigProvider(tc.opts) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cfgProvider, err := NewHCPProviderImpl(ctx, tc.opts) if tc.wantErr != "" { require.Error(t, err) require.Contains(t, err.Error(), tc.wantErr) @@ -139,16 +130,16 @@ func TestTelemetryConfigProvider_Success(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - opts := &TelemetryConfigProviderOpts{ - MetricsConfig: optsCfg.MetricsConfig, - Ctx: ctx, - HCPClient: mockClient, - RefreshInterval: defaultTestRefreshInterval, + opts := &providerParams{ + metricsConfig: optsCfg.MetricsConfig, + hcpClient: mockClient, + refreshInterval: defaultTestRefreshInterval, } - configProvider, err := NewTelemetryConfigProvider(opts) + configProvider, err := NewHCPProviderImpl(ctx, opts) require.NoError(t, err) + // TODO: Test this by having access to the ticker directly. require.EventuallyWithTf(t, func(c *assert.CollectT) { assert.Equal(c, tc.expected.endpoint, configProvider.GetEndpoint().String()) assert.Equal(c, tc.expected.filters, configProvider.GetFilters().String()) @@ -194,18 +185,18 @@ func TestTelemetryConfigProvider_UpdateFailuresWithMetrics(t *testing.T) { mockClient := hcpclient.NewMockClient(t) tc.expect(mockClient) - opts := &TelemetryConfigProviderOpts{ - Ctx: ctx, - MetricsConfig: telemetryConfig.MetricsConfig, - HCPClient: mockClient, - RefreshInterval: defaultTestRefreshInterval, + opts := &providerParams{ + metricsConfig: telemetryConfig.MetricsConfig, + hcpClient: mockClient, + refreshInterval: defaultTestRefreshInterval, } - configProvider, err := NewTelemetryConfigProvider(opts) + configProvider, err := NewHCPProviderImpl(ctx, opts) require.NoError(t, err) // Eventually tries to run assertions every 100 ms to verify // if failure metrics and the dynamic config have been updated as expected. + // TODO: Use ticker directly to test this. require.EventuallyWithTf(t, func(c *assert.CollectT) { // Collect sink metrics. key := serviceName + "." + strings.Join(internalMetricRefreshFailure, ".") @@ -228,6 +219,8 @@ func TestTelemetryConfigProvider_UpdateFailuresWithMetrics(t *testing.T) { } } +// TODO: Add race test. + func telemetryConfig(testCfg *testConfig) (*hcpclient.TelemetryConfig, error) { filters, err := regexp.Compile(testCfg.filters) if err != nil {