diff --git a/exporter/datadogexporter/config.go b/exporter/datadogexporter/config.go index 39b131dfde07..da66b0bc5975 100644 --- a/exporter/datadogexporter/config.go +++ b/exporter/datadogexporter/config.go @@ -9,6 +9,7 @@ import ( "fmt" "regexp" "strings" + "time" "github.com/DataDog/datadog-agent/pkg/util/hostname/validate" "go.opentelemetry.io/collector/component" @@ -411,6 +412,11 @@ type HostMetadataConfig struct { // These tags will be attached to telemetry signals that have the host metadata hostname. // To attach tags to telemetry signals regardless of the host, use a processor instead. Tags []string `mapstructure:"tags"` + + // sourceTimeout is the timeout to fetch from each provider - for example AWS IMDS. + // If unset, or set to zero duration, there will be no timeout applied. + // Default is no timeout. + sourceTimeout time.Duration } // Config defines configuration for the Datadog exporter. diff --git a/exporter/datadogexporter/factory.go b/exporter/datadogexporter/factory.go index 3ae7f2d42e26..fa6a6957c0e2 100644 --- a/exporter/datadogexporter/factory.go +++ b/exporter/datadogexporter/factory.go @@ -106,9 +106,9 @@ type factory struct { registry *featuregate.Registry } -func (f *factory) SourceProvider(set component.TelemetrySettings, configHostname string) (source.Provider, error) { +func (f *factory) SourceProvider(set component.TelemetrySettings, configHostname string, timeout time.Duration) (source.Provider, error) { f.onceProvider.Do(func() { - f.sourceProvider, f.providerErr = hostmetadata.GetSourceProvider(set, configHostname) + f.sourceProvider, f.providerErr = hostmetadata.GetSourceProvider(set, configHostname, timeout) }) return f.sourceProvider, f.providerErr } @@ -289,7 +289,7 @@ func (f *factory) createMetricsExporter( c component.Config, ) (exporter.Metrics, error) { cfg := checkAndCastConfig(c, set.TelemetrySettings.Logger) - hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname) + hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname, cfg.HostMetadata.sourceTimeout) if err != nil { return nil, fmt.Errorf("failed to build hostname provider: %w", err) } @@ -409,7 +409,7 @@ func (f *factory) createTracesExporter( wg sync.WaitGroup // waits for agent to exit ) - hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname) + hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname, cfg.HostMetadata.sourceTimeout) if err != nil { return nil, fmt.Errorf("failed to build hostname provider: %w", err) } @@ -496,7 +496,7 @@ func (f *factory) createLogsExporter( var pusher consumer.ConsumeLogsFunc var logsAgent logsagentpipeline.LogsAgent - hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname) + hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname, cfg.HostMetadata.sourceTimeout) if err != nil { return nil, fmt.Errorf("failed to build hostname provider: %w", err) } diff --git a/exporter/datadogexporter/factory_test.go b/exporter/datadogexporter/factory_test.go index fa1137d1417a..e01b1521fad4 100644 --- a/exporter/datadogexporter/factory_test.go +++ b/exporter/datadogexporter/factory_test.go @@ -706,6 +706,7 @@ func TestOnlyMetadata(t *testing.T) { HostMetadata: HostMetadataConfig{ Enabled: true, HostnameSource: HostnameSourceFirstResource, + sourceTimeout: 50 * time.Millisecond, }, } diff --git a/exporter/datadogexporter/internal/hostmetadata/host.go b/exporter/datadogexporter/internal/hostmetadata/host.go index 8818bc99d656..c99e689ac73b 100644 --- a/exporter/datadogexporter/internal/hostmetadata/host.go +++ b/exporter/datadogexporter/internal/hostmetadata/host.go @@ -5,6 +5,7 @@ package hostmetadata // import "github.com/open-telemetry/opentelemetry-collecto import ( "fmt" + "time" "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source" "go.opentelemetry.io/collector/component" @@ -27,7 +28,7 @@ var _ = featuregate.GlobalRegistry().MustRegister( featuregate.WithRegisterToVersion("0.75.0"), ) -func GetSourceProvider(set component.TelemetrySettings, configHostname string) (source.Provider, error) { +func GetSourceProvider(set component.TelemetrySettings, configHostname string, timeout time.Duration) (source.Provider, error) { ecs, err := ecs.NewProvider(set) if err != nil { return nil, fmt.Errorf("failed to build ECS Fargate provider: %w", err) @@ -69,6 +70,7 @@ func GetSourceProvider(set component.TelemetrySettings, configHostname string) ( "system": system.NewProvider(set.Logger), }, []string{"config", "azure", "ecs", "ec2", "gcp", "kubernetes", "system"}, + timeout, ) if err != nil { diff --git a/exporter/datadogexporter/internal/hostmetadata/host_test.go b/exporter/datadogexporter/internal/hostmetadata/host_test.go index 0ee989751839..f4a45c947124 100644 --- a/exporter/datadogexporter/internal/hostmetadata/host_test.go +++ b/exporter/datadogexporter/internal/hostmetadata/host_test.go @@ -6,6 +6,7 @@ package hostmetadata import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -13,7 +14,7 @@ import ( ) func TestHost(t *testing.T) { - p, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "test-host") + p, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "test-host", 31*time.Second) require.NoError(t, err) src, err := p.Source(context.Background()) require.NoError(t, err) diff --git a/exporter/datadogexporter/internal/hostmetadata/internal/ec2/ec2.go b/exporter/datadogexporter/internal/hostmetadata/internal/ec2/ec2.go index 0b53df8fc78b..d0920ecc5856 100644 --- a/exporter/datadogexporter/internal/hostmetadata/internal/ec2/ec2.go +++ b/exporter/datadogexporter/internal/hostmetadata/internal/ec2/ec2.go @@ -43,7 +43,7 @@ func isDefaultHostname(hostname string) bool { } // GetHostInfo gets the hostname info from EC2 metadata -func GetHostInfo(logger *zap.Logger) (hostInfo *HostInfo) { +func GetHostInfo(ctx context.Context, logger *zap.Logger) (hostInfo *HostInfo) { sess, err := session.NewSession() hostInfo = &HostInfo{} @@ -54,18 +54,18 @@ func GetHostInfo(logger *zap.Logger) (hostInfo *HostInfo) { meta := ec2metadata.New(sess) - if !meta.Available() { + if !meta.AvailableWithContext(ctx) { logger.Debug("EC2 Metadata not available") return } - if idDoc, err := meta.GetInstanceIdentityDocument(); err == nil { + if idDoc, err := meta.GetInstanceIdentityDocumentWithContext(ctx); err == nil { hostInfo.InstanceID = idDoc.InstanceID } else { logger.Warn("Failed to get EC2 instance id document", zap.Error(err)) } - if ec2Hostname, err := meta.GetMetadata("hostname"); err == nil { + if ec2Hostname, err := meta.GetMetadataWithContext(ctx, "hostname"); err == nil { hostInfo.EC2Hostname = ec2Hostname } else { logger.Warn("Failed to get EC2 hostname", zap.Error(err)) @@ -104,12 +104,12 @@ func NewProvider(logger *zap.Logger) (*Provider, error) { }, nil } -func (p *Provider) fillHostInfo() { - p.once.Do(func() { p.hostInfo = *GetHostInfo(p.logger) }) +func (p *Provider) fillHostInfo(ctx context.Context) { + p.once.Do(func() { p.hostInfo = *GetHostInfo(ctx, p.logger) }) } -func (p *Provider) Source(_ context.Context) (source.Source, error) { - p.fillHostInfo() +func (p *Provider) Source(ctx context.Context) (source.Source, error) { + p.fillHostInfo(ctx) if p.hostInfo.InstanceID == "" { return source.Source{}, fmt.Errorf("instance ID is unavailable") } @@ -175,6 +175,6 @@ func (p *Provider) ClusterName(ctx context.Context) (string, error) { } func (p *Provider) HostInfo() *HostInfo { - p.fillHostInfo() + p.fillHostInfo(context.Background()) return &p.hostInfo } diff --git a/exporter/datadogexporter/internal/hostmetadata/metadata.go b/exporter/datadogexporter/internal/hostmetadata/metadata.go index 66299aeddb50..1fcfe2806e2f 100644 --- a/exporter/datadogexporter/internal/hostmetadata/metadata.go +++ b/exporter/datadogexporter/internal/hostmetadata/metadata.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -// Package metadata is responsible for collecting host metadata from different providers +// Package hostmetadata is responsible for collecting host metadata from different providers // such as EC2, ECS, AWS, etc and pushing it to Datadog. package hostmetadata // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/hostmetadata" @@ -77,7 +77,7 @@ func fillHostMetadata(params exporter.Settings, pcfg PusherConfig, p source.Prov hm.Processes = gohai.NewProcessesPayload(hm.Meta.Hostname, params.Logger) // EC2 data was not set from attributes if hm.Meta.EC2Hostname == "" { - ec2HostInfo := ec2.GetHostInfo(params.Logger) + ec2HostInfo := ec2.GetHostInfo(context.Background(), params.Logger) hm.Meta.EC2Hostname = ec2HostInfo.EC2Hostname hm.Meta.InstanceID = ec2HostInfo.InstanceID } diff --git a/exporter/datadogexporter/internal/hostmetadata/metadata_test.go b/exporter/datadogexporter/internal/hostmetadata/metadata_test.go index ca0644e58cf6..9bba10d60957 100644 --- a/exporter/datadogexporter/internal/hostmetadata/metadata_test.go +++ b/exporter/datadogexporter/internal/hostmetadata/metadata_test.go @@ -66,7 +66,7 @@ func TestFillHostMetadata(t *testing.T) { ConfigTags: []string{"key1:tag1", "key2:tag2", "env:prod"}, } - hostProvider, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "hostname") + hostProvider, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "hostname", 31*time.Second) require.NoError(t, err) metadata := payload.NewEmpty() @@ -234,7 +234,7 @@ func TestPusher(t *testing.T) { params := exportertest.NewNopSettings() params.BuildInfo = mockBuildInfo - hostProvider, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "source-hostname") + hostProvider, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "source-hostname", 31*time.Second) require.NoError(t, err) attrs := testutil.NewAttributeMap(map[string]string{ diff --git a/exporter/datadogexporter/internal/hostmetadata/provider/provider.go b/exporter/datadogexporter/internal/hostmetadata/provider/provider.go index c713309cb9bd..20838742f52f 100644 --- a/exporter/datadogexporter/internal/hostmetadata/provider/provider.go +++ b/exporter/datadogexporter/internal/hostmetadata/provider/provider.go @@ -8,6 +8,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source" "go.uber.org/zap" @@ -19,6 +20,7 @@ type chainProvider struct { logger *zap.Logger providers map[string]source.Provider priorityList []string + timeout time.Duration } func (p *chainProvider) Source(ctx context.Context) (source.Source, error) { @@ -32,6 +34,16 @@ func (p *chainProvider) Source(ctx context.Context) (source.Source, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() + // Make a different context for our provider calls, to differentiate between a provider timing out and the entire + // context being cancelled + var childCtx context.Context + if p.timeout != 0 { + childCtx, cancel = context.WithTimeout(ctx, p.timeout) + } else { + childCtx, cancel = context.WithCancel(ctx) + } + defer cancel() + // Run all providers in parallel replies := make([]chan reply, len(p.priorityList)) for i, source := range p.priorityList { @@ -39,7 +51,7 @@ func (p *chainProvider) Source(ctx context.Context) (source.Source, error) { replies[i] = make(chan reply) p.logger.Debug("Trying out source provider", zap.String("provider", source)) go func(i int) { - src, err := provider.Source(ctx) + src, err := provider.Source(childCtx) replies[i] <- reply{src: src, err: err} }(i) } @@ -65,14 +77,14 @@ func (p *chainProvider) Source(ctx context.Context) (source.Source, error) { } // Chain providers into a single provider that returns the first available hostname. -func Chain(logger *zap.Logger, providers map[string]source.Provider, priorityList []string) (source.Provider, error) { +func Chain(logger *zap.Logger, providers map[string]source.Provider, priorityList []string, timeout time.Duration) (source.Provider, error) { for _, source := range priorityList { if _, ok := providers[source]; !ok { return nil, fmt.Errorf("%q source is not available in providers", source) } } - return &chainProvider{logger: logger, providers: providers, priorityList: priorityList}, nil + return &chainProvider{logger: logger, providers: providers, priorityList: priorityList, timeout: timeout}, nil } var _ source.Provider = (*configProvider)(nil) diff --git a/exporter/datadogexporter/internal/hostmetadata/provider/provider_test.go b/exporter/datadogexporter/internal/hostmetadata/provider/provider_test.go index 7e5fdf1afdc9..d3390801fc8a 100644 --- a/exporter/datadogexporter/internal/hostmetadata/provider/provider_test.go +++ b/exporter/datadogexporter/internal/hostmetadata/provider/provider_test.go @@ -6,6 +6,7 @@ package provider // import "github.com/open-telemetry/opentelemetry-collector-co import ( "context" "errors" + "fmt" "testing" "time" @@ -38,8 +39,12 @@ type delayedProvider struct { } func (p *delayedProvider) Source(ctx context.Context) (source.Source, error) { - time.Sleep(p.delay) - return p.provider.Source(ctx) + select { + case <-ctx.Done(): + return source.Source{}, fmt.Errorf("no source provider was available") + case <-time.After(p.delay): + return p.provider.Source(ctx) + } } func withDelay(provider source.Provider, delay time.Duration) source.Provider { @@ -56,6 +61,7 @@ func TestChain(t *testing.T) { hostname string queryErr string + timeout time.Duration }{ { name: "missing provider in priority list", @@ -78,6 +84,18 @@ func TestChain(t *testing.T) { queryErr: "no source provider was available", }, + { + name: "all providers timeout", + providers: map[string]source.Provider{ + "p1": withDelay(HostProvider("p1SourceName"), 100*time.Millisecond), + "p2": withDelay(HostProvider("p2SourceName"), 100*time.Millisecond), + "p3": withDelay(HostProvider("p3SourceName"), 100*time.Millisecond), + }, + priorityList: []string{"p1", "p2", "p3"}, + + queryErr: "no source provider was available", + timeout: 10 * time.Millisecond, + }, { name: "no providers fail", providers: map[string]source.Provider{ @@ -111,11 +129,23 @@ func TestChain(t *testing.T) { hostname: "p2SourceName", }, + { + name: "p2 times out", + providers: map[string]source.Provider{ + "p1": ErrorSourceProvider("p1Err"), + "p2": withDelay(HostProvider("p2SourceName"), 50*time.Millisecond), + "p3": HostProvider("p3SourceName"), + }, + priorityList: []string{"p1", "p2", "p3"}, + + hostname: "p3SourceName", + timeout: 10 * time.Millisecond, + }, } for _, testInstance := range tests { t.Run(testInstance.name, func(t *testing.T) { - provider, err := Chain(zaptest.NewLogger(t), testInstance.providers, testInstance.priorityList) + provider, err := Chain(zaptest.NewLogger(t), testInstance.providers, testInstance.priorityList, testInstance.timeout) if err != nil || testInstance.buildErr != "" { assert.EqualError(t, err, testInstance.buildErr) return diff --git a/exporter/datadogexporter/metrics_exporter_test.go b/exporter/datadogexporter/metrics_exporter_test.go index 0455ed3bc6a4..3df4efecb6b7 100644 --- a/exporter/datadogexporter/metrics_exporter_test.go +++ b/exporter/datadogexporter/metrics_exporter_test.go @@ -58,6 +58,9 @@ func TestNewExporter(t *testing.T) { CumulativeMonotonicMode: CumulativeMonotonicSumModeToDelta, }, }, + HostMetadata: HostMetadataConfig{ + sourceTimeout: 50 * time.Millisecond, + }, } params := exportertest.NewNopSettings() f := NewFactory()