Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Source Timeout for Providers #33958

Merged
merged 13 commits into from
Jul 17, 2024
6 changes: 6 additions & 0 deletions exporter/datadogexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"regexp"
"strings"
"time"

"github.com/DataDog/datadog-agent/pkg/util/hostname/validate"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -411,6 +412,11 @@ type HostMetadataConfig struct {
// These tags will be attached to telemetry signals that have the host metadata hostname.
// To attach tags to telemetry signals regardless of the host, use a processor instead.
Tags []string `mapstructure:"tags"`

// sourceTimeout is the timeout to fetch from each provider - for example AWS IMDS.
// If unset, or set to zero duration, there will be no timeout applied.
// Default is no timeout.
sourceTimeout time.Duration
}

// Config defines configuration for the Datadog exporter.
Expand Down
10 changes: 5 additions & 5 deletions exporter/datadogexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ type factory struct {
registry *featuregate.Registry
}

func (f *factory) SourceProvider(set component.TelemetrySettings, configHostname string) (source.Provider, error) {
func (f *factory) SourceProvider(set component.TelemetrySettings, configHostname string, timeout time.Duration) (source.Provider, error) {
f.onceProvider.Do(func() {
f.sourceProvider, f.providerErr = hostmetadata.GetSourceProvider(set, configHostname)
f.sourceProvider, f.providerErr = hostmetadata.GetSourceProvider(set, configHostname, timeout)
})
return f.sourceProvider, f.providerErr
}
Expand Down Expand Up @@ -289,7 +289,7 @@ func (f *factory) createMetricsExporter(
c component.Config,
) (exporter.Metrics, error) {
cfg := checkAndCastConfig(c, set.TelemetrySettings.Logger)
hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname)
hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname, cfg.HostMetadata.sourceTimeout)
if err != nil {
return nil, fmt.Errorf("failed to build hostname provider: %w", err)
}
Expand Down Expand Up @@ -409,7 +409,7 @@ func (f *factory) createTracesExporter(
wg sync.WaitGroup // waits for agent to exit
)

hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname)
hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname, cfg.HostMetadata.sourceTimeout)
if err != nil {
return nil, fmt.Errorf("failed to build hostname provider: %w", err)
}
Expand Down Expand Up @@ -496,7 +496,7 @@ func (f *factory) createLogsExporter(

var pusher consumer.ConsumeLogsFunc
var logsAgent logsagentpipeline.LogsAgent
hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname)
hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname, cfg.HostMetadata.sourceTimeout)
if err != nil {
return nil, fmt.Errorf("failed to build hostname provider: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions exporter/datadogexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@ func TestOnlyMetadata(t *testing.T) {
HostMetadata: HostMetadataConfig{
Enabled: true,
HostnameSource: HostnameSourceFirstResource,
sourceTimeout: 50 * time.Millisecond,
},
}

Expand Down
4 changes: 3 additions & 1 deletion exporter/datadogexporter/internal/hostmetadata/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package hostmetadata // import "github.com/open-telemetry/opentelemetry-collecto

import (
"fmt"
"time"

"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"go.opentelemetry.io/collector/component"
Expand All @@ -27,7 +28,7 @@ var _ = featuregate.GlobalRegistry().MustRegister(
featuregate.WithRegisterToVersion("0.75.0"),
)

func GetSourceProvider(set component.TelemetrySettings, configHostname string) (source.Provider, error) {
func GetSourceProvider(set component.TelemetrySettings, configHostname string, timeout time.Duration) (source.Provider, error) {
mx-psi marked this conversation as resolved.
Show resolved Hide resolved
ecs, err := ecs.NewProvider(set)
if err != nil {
return nil, fmt.Errorf("failed to build ECS Fargate provider: %w", err)
Expand Down Expand Up @@ -69,6 +70,7 @@ func GetSourceProvider(set component.TelemetrySettings, configHostname string) (
"system": system.NewProvider(set.Logger),
},
[]string{"config", "azure", "ecs", "ec2", "gcp", "kubernetes", "system"},
timeout,
)

if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ package hostmetadata
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
)

func TestHost(t *testing.T) {
p, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "test-host")
p, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "test-host", 31*time.Second)
require.NoError(t, err)
src, err := p.Source(context.Background())
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func isDefaultHostname(hostname string) bool {
}

// GetHostInfo gets the hostname info from EC2 metadata
func GetHostInfo(logger *zap.Logger) (hostInfo *HostInfo) {
func GetHostInfo(ctx context.Context, logger *zap.Logger) (hostInfo *HostInfo) {
sess, err := session.NewSession()
hostInfo = &HostInfo{}

Expand All @@ -54,18 +54,18 @@ func GetHostInfo(logger *zap.Logger) (hostInfo *HostInfo) {

meta := ec2metadata.New(sess)

if !meta.Available() {
if !meta.AvailableWithContext(ctx) {
logger.Debug("EC2 Metadata not available")
return
}

if idDoc, err := meta.GetInstanceIdentityDocument(); err == nil {
if idDoc, err := meta.GetInstanceIdentityDocumentWithContext(ctx); err == nil {
hostInfo.InstanceID = idDoc.InstanceID
} else {
logger.Warn("Failed to get EC2 instance id document", zap.Error(err))
}

if ec2Hostname, err := meta.GetMetadata("hostname"); err == nil {
if ec2Hostname, err := meta.GetMetadataWithContext(ctx, "hostname"); err == nil {
hostInfo.EC2Hostname = ec2Hostname
} else {
logger.Warn("Failed to get EC2 hostname", zap.Error(err))
Expand Down Expand Up @@ -104,12 +104,12 @@ func NewProvider(logger *zap.Logger) (*Provider, error) {
}, nil
}

func (p *Provider) fillHostInfo() {
p.once.Do(func() { p.hostInfo = *GetHostInfo(p.logger) })
func (p *Provider) fillHostInfo(ctx context.Context) {
p.once.Do(func() { p.hostInfo = *GetHostInfo(ctx, p.logger) })
}

func (p *Provider) Source(_ context.Context) (source.Source, error) {
p.fillHostInfo()
func (p *Provider) Source(ctx context.Context) (source.Source, error) {
p.fillHostInfo(ctx)
if p.hostInfo.InstanceID == "" {
return source.Source{}, fmt.Errorf("instance ID is unavailable")
}
Expand Down Expand Up @@ -175,6 +175,6 @@ func (p *Provider) ClusterName(ctx context.Context) (string, error) {
}

func (p *Provider) HostInfo() *HostInfo {
p.fillHostInfo()
p.fillHostInfo(context.Background())
return &p.hostInfo
}
4 changes: 2 additions & 2 deletions exporter/datadogexporter/internal/hostmetadata/metadata.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package metadata is responsible for collecting host metadata from different providers
// Package hostmetadata is responsible for collecting host metadata from different providers
// such as EC2, ECS, AWS, etc and pushing it to Datadog.
package hostmetadata // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/hostmetadata"

Expand Down Expand Up @@ -77,7 +77,7 @@ func fillHostMetadata(params exporter.Settings, pcfg PusherConfig, p source.Prov
hm.Processes = gohai.NewProcessesPayload(hm.Meta.Hostname, params.Logger)
// EC2 data was not set from attributes
if hm.Meta.EC2Hostname == "" {
ec2HostInfo := ec2.GetHostInfo(params.Logger)
ec2HostInfo := ec2.GetHostInfo(context.Background(), params.Logger)
hm.Meta.EC2Hostname = ec2HostInfo.EC2Hostname
hm.Meta.InstanceID = ec2HostInfo.InstanceID
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestFillHostMetadata(t *testing.T) {
ConfigTags: []string{"key1:tag1", "key2:tag2", "env:prod"},
}

hostProvider, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "hostname")
hostProvider, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "hostname", 31*time.Second)
require.NoError(t, err)

metadata := payload.NewEmpty()
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestPusher(t *testing.T) {
params := exportertest.NewNopSettings()
params.BuildInfo = mockBuildInfo

hostProvider, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "source-hostname")
hostProvider, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "source-hostname", 31*time.Second)
require.NoError(t, err)

attrs := testutil.NewAttributeMap(map[string]string{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"go.uber.org/zap"
Expand All @@ -19,6 +20,7 @@ type chainProvider struct {
logger *zap.Logger
providers map[string]source.Provider
priorityList []string
timeout time.Duration
}

func (p *chainProvider) Source(ctx context.Context) (source.Source, error) {
Expand All @@ -32,14 +34,24 @@ func (p *chainProvider) Source(ctx context.Context) (source.Source, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Make a different context for our provider calls, to differentiate between a provider timing out and the entire
// context being cancelled
var childCtx context.Context
if p.timeout != 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to do this here and not at the top level (when we build the source provider)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused - which part should we move where? Moving the creation of the child context into the Chain function?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the WithTimeout up until we call Source instead of here. It does mean adding it in more places, which is a bit more annoying. I'll leave what to do up to you

Also, I am now wondering if we want a separate timeout for this or if just the per-OTLP blob timeout added by the exporterhelper works . Probably we want a separate one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - I think its good to leave in here. I think its good for this logic to be right next to the call site and if we ever want to do something fancier like per source timeout it will be much easier to add it here.

I definitely think we should want a separate timeout for this! the per otlp blob timeout should be used just for the normal export case - not the occasions we're blocked on fetching a source.

childCtx, cancel = context.WithTimeout(ctx, p.timeout)
} else {
childCtx, cancel = context.WithCancel(ctx)
}
defer cancel()

// Run all providers in parallel
replies := make([]chan reply, len(p.priorityList))
for i, source := range p.priorityList {
provider := p.providers[source]
replies[i] = make(chan reply)
p.logger.Debug("Trying out source provider", zap.String("provider", source))
go func(i int) {
src, err := provider.Source(ctx)
src, err := provider.Source(childCtx)
replies[i] <- reply{src: src, err: err}
}(i)
}
Expand All @@ -65,14 +77,14 @@ func (p *chainProvider) Source(ctx context.Context) (source.Source, error) {
}

// Chain providers into a single provider that returns the first available hostname.
func Chain(logger *zap.Logger, providers map[string]source.Provider, priorityList []string) (source.Provider, error) {
func Chain(logger *zap.Logger, providers map[string]source.Provider, priorityList []string, timeout time.Duration) (source.Provider, error) {
for _, source := range priorityList {
if _, ok := providers[source]; !ok {
return nil, fmt.Errorf("%q source is not available in providers", source)
}
}

return &chainProvider{logger: logger, providers: providers, priorityList: priorityList}, nil
return &chainProvider{logger: logger, providers: providers, priorityList: priorityList, timeout: timeout}, nil
}

var _ source.Provider = (*configProvider)(nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package provider // import "github.com/open-telemetry/opentelemetry-collector-co
import (
"context"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -38,8 +39,12 @@ type delayedProvider struct {
}

func (p *delayedProvider) Source(ctx context.Context) (source.Source, error) {
time.Sleep(p.delay)
return p.provider.Source(ctx)
select {
case <-ctx.Done():
return source.Source{}, fmt.Errorf("no source provider was available")
case <-time.After(p.delay):
return p.provider.Source(ctx)
}
}

func withDelay(provider source.Provider, delay time.Duration) source.Provider {
Expand All @@ -56,6 +61,7 @@ func TestChain(t *testing.T) {

hostname string
queryErr string
timeout time.Duration
}{
{
name: "missing provider in priority list",
Expand All @@ -78,6 +84,18 @@ func TestChain(t *testing.T) {

queryErr: "no source provider was available",
},
{
name: "all providers timeout",
providers: map[string]source.Provider{
"p1": withDelay(HostProvider("p1SourceName"), 100*time.Millisecond),
"p2": withDelay(HostProvider("p2SourceName"), 100*time.Millisecond),
"p3": withDelay(HostProvider("p3SourceName"), 100*time.Millisecond),
},
priorityList: []string{"p1", "p2", "p3"},

queryErr: "no source provider was available",
timeout: 10 * time.Millisecond,
},
{
name: "no providers fail",
providers: map[string]source.Provider{
Expand Down Expand Up @@ -111,11 +129,23 @@ func TestChain(t *testing.T) {

hostname: "p2SourceName",
},
{
name: "p2 times out",
providers: map[string]source.Provider{
"p1": ErrorSourceProvider("p1Err"),
"p2": withDelay(HostProvider("p2SourceName"), 50*time.Millisecond),
"p3": HostProvider("p3SourceName"),
},
priorityList: []string{"p1", "p2", "p3"},

hostname: "p3SourceName",
timeout: 10 * time.Millisecond,
},
}

for _, testInstance := range tests {
t.Run(testInstance.name, func(t *testing.T) {
provider, err := Chain(zaptest.NewLogger(t), testInstance.providers, testInstance.priorityList)
provider, err := Chain(zaptest.NewLogger(t), testInstance.providers, testInstance.priorityList, testInstance.timeout)
if err != nil || testInstance.buildErr != "" {
assert.EqualError(t, err, testInstance.buildErr)
return
Expand Down
3 changes: 3 additions & 0 deletions exporter/datadogexporter/metrics_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func TestNewExporter(t *testing.T) {
CumulativeMonotonicMode: CumulativeMonotonicSumModeToDelta,
},
},
HostMetadata: HostMetadataConfig{
sourceTimeout: 50 * time.Millisecond,
},
}
params := exportertest.NewNopSettings()
f := NewFactory()
Expand Down