Skip to content

Commit

Permalink
PR review updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Achooo committed Jul 31, 2023
1 parent 3b031a2 commit cb869b1
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 79 deletions.
12 changes: 6 additions & 6 deletions agent/hcp/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,19 @@ func sink(
return nil, nil
}

cfgProvider, err := NewTelemetryConfigProvider(&TelemetryConfigProviderOpts{
Ctx: ctx,
MetricsConfig: telemetryCfg.MetricsConfig,
HCPClient: hcpClient,
RefreshInterval: telemetryCfg.RefreshConfig.RefreshInterval,
cfgProvider, err := NewHCPProviderImpl(ctx, &providerParams{
metricsConfig: telemetryCfg.MetricsConfig,
hcpClient: hcpClient,
refreshInterval: telemetryCfg.RefreshConfig.RefreshInterval,
})
if err != nil {
return nil, fmt.Errorf("failed to init config provider: %w", err)
}

reader := telemetry.NewOTELReader(metricsClient, cfgProvider, telemetry.DefaultExportInterval)
sinkOpts := &telemetry.OTELSinkOpts{
Ctx: ctx,
Reader: telemetry.NewOTELReader(metricsClient, cfgProvider, telemetry.DefaultExportInterval),
Reader: reader,
ConfigProvider: cfgProvider,
}

Expand Down
72 changes: 36 additions & 36 deletions agent/hcp/telemetry_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,23 @@ import (
"github.com/mitchellh/hashstructure/v2"

hcpclient "github.com/hashicorp/consul/agent/hcp/client"
hcpTelemetry "github.com/hashicorp/consul/agent/hcp/telemetry"
)

var (
// internalMetricRefreshFailure is a metric to monitors refresh failures.
internalMetricRefreshFailure []string = []string{"hcp", "telemetry_config_provider", "refresh", "failure"}
)

// TelemetryConfigProviderOpts is used to initialize a telemetryConfigProvider.
type TelemetryConfigProviderOpts struct {
Ctx context.Context
MetricsConfig *hcpclient.MetricsConfig
RefreshInterval time.Duration
HCPClient hcpclient.Client
// Ensure implementation of the telemetry provider interfaces.
var _ hcpTelemetry.ConfigProvider = &hcpProviderImpl{}
var _ hcpTelemetry.EndpointProvider = &hcpProviderImpl{}

// providerParams is used to initialize a hcpProviderImpl.
type providerParams struct {
metricsConfig *hcpclient.MetricsConfig
refreshInterval time.Duration
hcpClient hcpclient.Client
}

// dynamicConfig is a set of configurable settings for metrics collection, processing and export.
Expand All @@ -37,66 +41,61 @@ type dynamicConfig struct {
refreshInterval time.Duration
}

// telemetryConfigProvider holds metrics configuration and settings for continuous fetch of new config from HCP.
type telemetryConfigProvider struct {
// telemetryConfig holds configuration that can be dynamically updated
// hcpProviderImpl holds metrics configuration and settings for continuous fetch of new config from HCP.
type hcpProviderImpl struct {
// cfg holds configuration that can be dynamically updated
// based on updates fetched from HCP.
cfg *dynamicConfig
// telemetryConfigHash is used to compare two telemetryConfig objects to see if they are the same.
// cfgHash is used to compare two telemetryConfig objects to see if they are the same.
cfgHash uint64

// a reader-writer mutex is used as the provider is read heavy, as the OTEL components
// access telemetryConfig, while config is only updated (write) when there are changes.
rw sync.RWMutex
logger hclog.Logger
hcpClient hcpclient.Client
}

func NewTelemetryConfigProvider(opts *TelemetryConfigProviderOpts) (*telemetryConfigProvider, error) {
if opts.Ctx == nil {
return nil, fmt.Errorf("missing ctx")
}

if opts.HCPClient == nil {
func NewHCPProviderImpl(ctx context.Context, params *providerParams) (*hcpProviderImpl, error) {
if params.hcpClient == nil {
return nil, fmt.Errorf("missing HCP client")
}

if opts.MetricsConfig == nil {
if params.metricsConfig == nil {
return nil, fmt.Errorf("missing metrics config")
}

if opts.RefreshInterval <= 0 {
// TODO: should this be 0, to disable refresh?
if params.refreshInterval <= 0 {
return nil, fmt.Errorf("invalid refresh interval")
}

cfg := &dynamicConfig{
endpoint: opts.MetricsConfig.Endpoint,
labels: opts.MetricsConfig.Labels,
filters: opts.MetricsConfig.Filters,
refreshInterval: opts.RefreshInterval,
endpoint: params.metricsConfig.Endpoint,
labels: params.metricsConfig.Labels,
filters: params.metricsConfig.Filters,
refreshInterval: params.refreshInterval,
}

hash, err := calculateHash(cfg)
if err != nil {
return nil, fmt.Errorf("failed to calculate hash: %w", err)
}

t := &telemetryConfigProvider{
t := &hcpProviderImpl{
cfg: cfg,
cfgHash: hash,
logger: hclog.FromContext(opts.Ctx).Named("telemetry_config_provider"),
hcpClient: opts.HCPClient,
hcpClient: params.hcpClient,
}

go t.run(opts.Ctx, opts.RefreshInterval)
go t.run(ctx)

return t, nil
}

// run continously checks for updates to the telemetry configuration by making a request to HCP.
// Modification of config only occurs if changes are detected to decrease write locks that block read locks.
func (t *telemetryConfigProvider) run(ctx context.Context, refreshInterval time.Duration) {
ticker := time.NewTicker(refreshInterval)
func (t *hcpProviderImpl) run(ctx context.Context) {
ticker := time.NewTicker(t.cfg.refreshInterval)
defer ticker.Stop()
for {
select {
Expand All @@ -113,16 +112,17 @@ func (t *telemetryConfigProvider) run(ctx context.Context, refreshInterval time.

// checkUpdate makes a HTTP request to HCP to return a new metrics configuration and true, if config changed.
// checkUpdate does not update the metricsConfig field to prevent acquiring the write lock unnecessarily.
func (t *telemetryConfigProvider) checkUpdate(ctx context.Context) (*dynamicConfig, bool) {
func (t *hcpProviderImpl) checkUpdate(ctx context.Context) (*dynamicConfig, bool) {
t.rw.RLock()
defer t.rw.RUnlock()
logger := hclog.FromContext(ctx).Named("telemetry_config_provider")

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

telemetryCfg, err := t.hcpClient.FetchTelemetryConfig(ctx)
if err != nil {
t.logger.Error("failed to fetch telemetry config from HCP", "error", err)
logger.Error("failed to fetch telemetry config from HCP", "error", err)
goMetrics.IncrCounter(internalMetricRefreshFailure, 1)
return nil, false
}
Expand All @@ -136,7 +136,7 @@ func (t *telemetryConfigProvider) checkUpdate(ctx context.Context) (*dynamicConf

newHash, err := calculateHash(newDynamicConfig)
if err != nil {
t.logger.Error("failed to calculate hash for new config", "error", err)
logger.Error("failed to calculate hash for new config", "error", err)
goMetrics.IncrCounter(internalMetricRefreshFailure, 1)
return nil, false
}
Expand All @@ -145,31 +145,31 @@ func (t *telemetryConfigProvider) checkUpdate(ctx context.Context) (*dynamicConf
}

// modifynewTelemetryConfig acquires a write lock to modify it with a given newTelemetryConfig object.
func (t *telemetryConfigProvider) modifyTelemetryConfig(newCfg *dynamicConfig) {
func (t *hcpProviderImpl) modifyTelemetryConfig(newCfg *dynamicConfig) {
t.rw.Lock()
defer t.rw.Unlock()

t.cfg = newCfg
}

// GetEndpoint acquires a read lock to return endpoint configuration for consumers.
func (t *telemetryConfigProvider) GetEndpoint() *url.URL {
func (t *hcpProviderImpl) GetEndpoint() *url.URL {
t.rw.RLock()
defer t.rw.RUnlock()

return t.cfg.endpoint
}

// GetFilters acquires a read lock to return filters configuration for consumers.
func (t *telemetryConfigProvider) GetFilters() *regexp.Regexp {
func (t *hcpProviderImpl) GetFilters() *regexp.Regexp {
t.rw.RLock()
defer t.rw.RUnlock()

return t.cfg.filters
}

// GetLabels acquires a read lock to return labels configuration for consumers.
func (t *telemetryConfigProvider) GetLabels() map[string]string {
func (t *hcpProviderImpl) GetLabels() map[string]string {
t.rw.RLock()
defer t.rw.RUnlock()

Expand Down
67 changes: 30 additions & 37 deletions agent/hcp/telemetry_config_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,52 +28,43 @@ type testConfig struct {
func TestNewTelemetryConfigProvider(t *testing.T) {
t.Parallel()
for name, tc := range map[string]struct {
opts *TelemetryConfigProviderOpts
opts *providerParams
wantErr string
}{
"success": {
opts: &TelemetryConfigProviderOpts{
Ctx: context.Background(),
HCPClient: hcpclient.NewMockClient(t),
MetricsConfig: &hcpclient.MetricsConfig{},
RefreshInterval: 1 * time.Second,
opts: &providerParams{
hcpClient: hcpclient.NewMockClient(t),
metricsConfig: &hcpclient.MetricsConfig{},
refreshInterval: 1 * time.Second,
},
},
"failsWithMissingContext": {
opts: &TelemetryConfigProviderOpts{
HCPClient: hcpclient.NewMockClient(t),
MetricsConfig: &hcpclient.MetricsConfig{},
},
wantErr: "missing ctx",
},
"failsWithMissingHCPClient": {
opts: &TelemetryConfigProviderOpts{
Ctx: context.Background(),
MetricsConfig: &hcpclient.MetricsConfig{},
opts: &providerParams{
metricsConfig: &hcpclient.MetricsConfig{},
},
wantErr: "missing HCP client",
},
"failsWithMissingMetricsConfig": {
opts: &TelemetryConfigProviderOpts{
Ctx: context.Background(),
HCPClient: hcpclient.NewMockClient(t),
opts: &providerParams{
hcpClient: hcpclient.NewMockClient(t),
},
wantErr: "missing metrics config",
},
"failsWithInvalidRefreshInterval": {
opts: &TelemetryConfigProviderOpts{
Ctx: context.Background(),
HCPClient: hcpclient.NewMockClient(t),
MetricsConfig: &hcpclient.MetricsConfig{},
RefreshInterval: 0 * time.Second,
opts: &providerParams{
hcpClient: hcpclient.NewMockClient(t),
metricsConfig: &hcpclient.MetricsConfig{},
refreshInterval: 0 * time.Second,
},
wantErr: "invalid refresh interval",
},
} {
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
cfgProvider, err := NewTelemetryConfigProvider(tc.opts)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cfgProvider, err := NewHCPProviderImpl(ctx, tc.opts)
if tc.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.wantErr)
Expand Down Expand Up @@ -139,16 +130,16 @@ func TestTelemetryConfigProvider_Success(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opts := &TelemetryConfigProviderOpts{
MetricsConfig: optsCfg.MetricsConfig,
Ctx: ctx,
HCPClient: mockClient,
RefreshInterval: defaultTestRefreshInterval,
opts := &providerParams{
metricsConfig: optsCfg.MetricsConfig,
hcpClient: mockClient,
refreshInterval: defaultTestRefreshInterval,
}

configProvider, err := NewTelemetryConfigProvider(opts)
configProvider, err := NewHCPProviderImpl(ctx, opts)
require.NoError(t, err)

// TODO: Test this by having access to the ticker directly.
require.EventuallyWithTf(t, func(c *assert.CollectT) {
assert.Equal(c, tc.expected.endpoint, configProvider.GetEndpoint().String())
assert.Equal(c, tc.expected.filters, configProvider.GetFilters().String())
Expand Down Expand Up @@ -194,18 +185,18 @@ func TestTelemetryConfigProvider_UpdateFailuresWithMetrics(t *testing.T) {
mockClient := hcpclient.NewMockClient(t)
tc.expect(mockClient)

opts := &TelemetryConfigProviderOpts{
Ctx: ctx,
MetricsConfig: telemetryConfig.MetricsConfig,
HCPClient: mockClient,
RefreshInterval: defaultTestRefreshInterval,
opts := &providerParams{
metricsConfig: telemetryConfig.MetricsConfig,
hcpClient: mockClient,
refreshInterval: defaultTestRefreshInterval,
}

configProvider, err := NewTelemetryConfigProvider(opts)
configProvider, err := NewHCPProviderImpl(ctx, opts)
require.NoError(t, err)

// Eventually tries to run assertions every 100 ms to verify
// if failure metrics and the dynamic config have been updated as expected.
// TODO: Use ticker directly to test this.
require.EventuallyWithTf(t, func(c *assert.CollectT) {
// Collect sink metrics.
key := serviceName + "." + strings.Join(internalMetricRefreshFailure, ".")
Expand All @@ -228,6 +219,8 @@ func TestTelemetryConfigProvider_UpdateFailuresWithMetrics(t *testing.T) {
}
}

// TODO: Add race test.

func telemetryConfig(testCfg *testConfig) (*hcpclient.TelemetryConfig, error) {
filters, err := regexp.Compile(testCfg.filters)
if err != nil {
Expand Down

0 comments on commit cb869b1

Please sign in to comment.