diff --git a/CHANGELOG.md b/CHANGELOG.md index faaf68319832..cf300718c655 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ - `datadogexporter`: Deprecate `OnlyMetadata` method from `Config` struct (#8359) +- `resourcedetectionprocessor`: Add attribute allowlist (#8547) + ## v0.47.0 ### 💡 Enhancements 💡 diff --git a/processor/resourcedetectionprocessor/README.md b/processor/resourcedetectionprocessor/README.md index 1c6e64648a63..785b9b283bff 100644 --- a/processor/resourcedetectionprocessor/README.md +++ b/processor/resourcedetectionprocessor/README.md @@ -268,6 +268,8 @@ processors: detectors: [ ] # determines if existing resource attributes should be overridden or preserved, defaults to true override: +# When included, only attributes in the list will be appened. Applies to all detectors. +attributes: [ ] ``` ## Ordering diff --git a/processor/resourcedetectionprocessor/config.go b/processor/resourcedetectionprocessor/config.go index 22e2a07e575a..77dc74309280 100644 --- a/processor/resourcedetectionprocessor/config.go +++ b/processor/resourcedetectionprocessor/config.go @@ -39,6 +39,9 @@ type Config struct { // HTTP client settings for the detector // Timeout default is 5s confighttp.HTTPClientSettings `mapstructure:",squash"` + // Attributes is an allowlist of attributes to add. + // If a supplied attribute is not a valid atrtibute of a supplied detector it will be ignored. + Attributes []string `mapstructure:"attributes"` } // DetectorConfig contains user-specified configurations unique to all individual detectors diff --git a/processor/resourcedetectionprocessor/config_test.go b/processor/resourcedetectionprocessor/config_test.go index 7cbbbabb28c0..5ec161b532f3 100644 --- a/processor/resourcedetectionprocessor/config_test.go +++ b/processor/resourcedetectionprocessor/config_test.go @@ -78,6 +78,7 @@ func TestLoadConfig(t *testing.T) { }, HTTPClientSettings: confighttp.HTTPClientSettings{Timeout: 2 * time.Second, MaxIdleConns: p2.MaxIdleConns, IdleConnTimeout: p2.IdleConnTimeout}, Override: false, + Attributes: []string{"a", "b"}, } assert.Equal(t, p4, p4e) } diff --git a/processor/resourcedetectionprocessor/factory.go b/processor/resourcedetectionprocessor/factory.go index e36969519a7a..448d260744c8 100644 --- a/processor/resourcedetectionprocessor/factory.go +++ b/processor/resourcedetectionprocessor/factory.go @@ -96,6 +96,7 @@ func createDefaultConfig() config.Processor { Detectors: []string{env.TypeStr}, HTTPClientSettings: defaultHTTPClientSettings(), Override: true, + Attributes: nil, // TODO: Once issue(https://github.com/open-telemetry/opentelemetry-collector/issues/4001) gets resolved, // Set the default value of 'hostname_source' here instead of 'system' detector } @@ -170,7 +171,7 @@ func (f *factory) getResourceDetectionProcessor( ) (*resourceDetectionProcessor, error) { oCfg := cfg.(*Config) - provider, err := f.getResourceProvider(params, cfg.ID(), oCfg.HTTPClientSettings.Timeout, oCfg.Detectors, oCfg.DetectorConfig) + provider, err := f.getResourceProvider(params, cfg.ID(), oCfg.HTTPClientSettings.Timeout, oCfg.Detectors, oCfg.DetectorConfig, oCfg.Attributes) if err != nil { return nil, err } @@ -189,6 +190,7 @@ func (f *factory) getResourceProvider( timeout time.Duration, configuredDetectors []string, detectorConfigs DetectorConfig, + attributes []string, ) (*internal.ResourceProvider, error) { f.lock.Lock() defer f.lock.Unlock() @@ -202,7 +204,7 @@ func (f *factory) getResourceProvider( detectorTypes = append(detectorTypes, internal.DetectorType(strings.TrimSpace(key))) } - provider, err := f.resourceProviderFactory.CreateResourceProvider(params, timeout, &detectorConfigs, detectorTypes...) + provider, err := f.resourceProviderFactory.CreateResourceProvider(params, timeout, attributes, &detectorConfigs, detectorTypes...) if err != nil { return nil, err } diff --git a/processor/resourcedetectionprocessor/internal/resourcedetection.go b/processor/resourcedetectionprocessor/internal/resourcedetection.go index f6016acac67a..356f9c4586ec 100644 --- a/processor/resourcedetectionprocessor/internal/resourcedetection.go +++ b/processor/resourcedetectionprocessor/internal/resourcedetection.go @@ -54,6 +54,7 @@ func NewProviderFactory(detectors map[DetectorType]DetectorFactory) *ResourcePro func (f *ResourceProviderFactory) CreateResourceProvider( params component.ProcessorCreateSettings, timeout time.Duration, + attributes []string, detectorConfigs ResourceDetectorConfig, detectorTypes ...DetectorType) (*ResourceProvider, error) { detectors, err := f.getDetectors(params, detectorConfigs, detectorTypes) @@ -61,7 +62,14 @@ func (f *ResourceProviderFactory) CreateResourceProvider( return nil, err } - provider := NewResourceProvider(params.Logger, timeout, detectors...) + attributesToKeep := make(map[string]struct{}) + if len(attributes) > 0 { + for _, attribute := range attributes { + attributesToKeep[attribute] = struct{}{} + } + } + + provider := NewResourceProvider(params.Logger, timeout, attributesToKeep, detectors...) return provider, nil } @@ -90,6 +98,7 @@ type ResourceProvider struct { detectors []Detector detectedResource *resourceResult once sync.Once + attributesToKeep map[string]struct{} } type resourceResult struct { @@ -98,11 +107,12 @@ type resourceResult struct { err error } -func NewResourceProvider(logger *zap.Logger, timeout time.Duration, detectors ...Detector) *ResourceProvider { +func NewResourceProvider(logger *zap.Logger, timeout time.Duration, attributesToKeep map[string]struct{}, detectors ...Detector) *ResourceProvider { return &ResourceProvider{ - logger: logger, - timeout: timeout, - detectors: detectors, + logger: logger, + timeout: timeout, + detectors: detectors, + attributesToKeep: attributesToKeep, } } @@ -133,10 +143,14 @@ func (p *ResourceProvider) detectResource(ctx context.Context) { mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, schemaURL) MergeResource(res, r, false) } - } + droppedAttributes := filterAttributes(res.Attributes(), p.attributesToKeep) + p.logger.Info("detected resource information", zap.Any("resource", AttributesToMap(res.Attributes()))) + if len(droppedAttributes) > 0 { + p.logger.Info("dropped resource information", zap.Strings("resource keys", droppedAttributes)) + } p.detectedResource.resource = res p.detectedResource.schemaURL = mergedSchemaURL @@ -194,6 +208,21 @@ func MergeSchemaURL(currentSchemaURL string, newSchemaURL string) string { return currentSchemaURL } +func filterAttributes(am pdata.AttributeMap, attributesToKeep map[string]struct{}) []string { + if len(attributesToKeep) > 0 { + droppedAttributes := make([]string, 0) + am.RemoveIf(func(k string, v pdata.Value) bool { + _, keep := attributesToKeep[k] + if !keep { + droppedAttributes = append(droppedAttributes, k) + } + return !keep + }) + return droppedAttributes + } + return nil +} + func MergeResource(to, from pdata.Resource, overrideTo bool) { if IsEmptyResource(from) { return diff --git a/processor/resourcedetectionprocessor/internal/resourcedetection_test.go b/processor/resourcedetectionprocessor/internal/resourcedetection_test.go index 30b07dafe3b1..e2b1df2cdf68 100644 --- a/processor/resourcedetectionprocessor/internal/resourcedetection_test.go +++ b/processor/resourcedetectionprocessor/internal/resourcedetection_test.go @@ -52,6 +52,7 @@ func TestDetect(t *testing.T) { name string detectedResources []pdata.Resource expectedResource pdata.Resource + attributes []string }{ { name: "Detect three resources", @@ -61,6 +62,7 @@ func TestDetect(t *testing.T) { NewResource(map[string]interface{}{"a": "12", "c": "3"}), }, expectedResource: NewResource(map[string]interface{}{"a": "1", "b": "2", "c": "3"}), + attributes: nil, }, { name: "Detect empty resources", detectedResources: []pdata.Resource{ @@ -69,6 +71,7 @@ func TestDetect(t *testing.T) { NewResource(map[string]interface{}{"a": "11"}), }, expectedResource: NewResource(map[string]interface{}{"a": "1", "b": "2"}), + attributes: nil, }, { name: "Detect non-string resources", detectedResources: []pdata.Resource{ @@ -77,6 +80,16 @@ func TestDetect(t *testing.T) { NewResource(map[string]interface{}{"a": "11"}), }, expectedResource: NewResource(map[string]interface{}{"a": "11", "bool": true, "int": int64(2), "double": 0.5}), + attributes: nil, + }, { + name: "Filter to one attribute", + detectedResources: []pdata.Resource{ + NewResource(map[string]interface{}{"a": "1", "b": "2"}), + NewResource(map[string]interface{}{"a": "11", "c": "3"}), + NewResource(map[string]interface{}{"a": "12", "c": "3"}), + }, + expectedResource: NewResource(map[string]interface{}{"a": "1"}), + attributes: []string{"a"}, }, } @@ -97,7 +110,7 @@ func TestDetect(t *testing.T) { } f := NewProviderFactory(mockDetectors) - p, err := f.CreateResourceProvider(componenttest.NewNopProcessorCreateSettings(), time.Second, &mockDetectorConfig{}, mockDetectorTypes...) + p, err := f.CreateResourceProvider(componenttest.NewNopProcessorCreateSettings(), time.Second, tt.attributes, &mockDetectorConfig{}, mockDetectorTypes...) require.NoError(t, err) got, _, err := p.Get(context.Background(), http.DefaultClient) @@ -113,7 +126,7 @@ func TestDetect(t *testing.T) { func TestDetectResource_InvalidDetectorType(t *testing.T) { mockDetectorKey := DetectorType("mock") p := NewProviderFactory(map[DetectorType]DetectorFactory{}) - _, err := p.CreateResourceProvider(componenttest.NewNopProcessorCreateSettings(), time.Second, &mockDetectorConfig{}, mockDetectorKey) + _, err := p.CreateResourceProvider(componenttest.NewNopProcessorCreateSettings(), time.Second, nil, &mockDetectorConfig{}, mockDetectorKey) require.EqualError(t, err, fmt.Sprintf("invalid detector key: %v", mockDetectorKey)) } @@ -124,7 +137,7 @@ func TestDetectResource_DetectoryFactoryError(t *testing.T) { return nil, errors.New("creation failed") }, }) - _, err := p.CreateResourceProvider(componenttest.NewNopProcessorCreateSettings(), time.Second, &mockDetectorConfig{}, mockDetectorKey) + _, err := p.CreateResourceProvider(componenttest.NewNopProcessorCreateSettings(), time.Second, nil, &mockDetectorConfig{}, mockDetectorKey) require.EqualError(t, err, fmt.Sprintf("failed creating detector type %q: %v", mockDetectorKey, "creation failed")) } @@ -135,7 +148,7 @@ func TestDetectResource_Error(t *testing.T) { md2 := &MockDetector{} md2.On("Detect").Return(pdata.NewResource(), errors.New("err1")) - p := NewResourceProvider(zap.NewNop(), time.Second, md1, md2) + p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2) _, _, err := p.Get(context.Background(), http.DefaultClient) require.NoError(t, err) } @@ -205,7 +218,7 @@ func TestDetectResource_Parallel(t *testing.T) { expectedResource := NewResource(map[string]interface{}{"a": "1", "b": "2", "c": "3"}) expectedResource.Attributes().Sort() - p := NewResourceProvider(zap.NewNop(), time.Second, md1, md2, md3) + p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2, md3) // call p.Get multiple times wg := &sync.WaitGroup{} @@ -235,6 +248,83 @@ func TestDetectResource_Parallel(t *testing.T) { md3.AssertNumberOfCalls(t, "Detect", 1) } +func TestFilterAttributes_Match(t *testing.T) { + m := map[string]struct{}{ + "host.name": {}, + "host.id": {}, + } + attr := pdata.NewAttributeMap() + attr.InsertString("host.name", "test") + attr.InsertString("host.id", "test") + attr.InsertString("drop.this", "test") + + droppedAttributes := filterAttributes(attr, m) + + _, ok := attr.Get("host.name") + assert.True(t, ok) + + _, ok = attr.Get("host.id") + assert.True(t, ok) + + _, ok = attr.Get("drop.this") + assert.False(t, ok) + + assert.Contains(t, droppedAttributes, "drop.this") +} + +func TestFilterAttributes_NoMatch(t *testing.T) { + m := map[string]struct{}{ + "cloud.region": {}, + } + attr := pdata.NewAttributeMap() + attr.InsertString("host.name", "test") + attr.InsertString("host.id", "test") + + droppedAttributes := filterAttributes(attr, m) + + _, ok := attr.Get("host.name") + assert.False(t, ok) + + _, ok = attr.Get("host.id") + assert.False(t, ok) + + assert.EqualValues(t, droppedAttributes, []string{"host.name", "host.id"}) +} + +func TestFilterAttributes_NilAttributes(t *testing.T) { + var m map[string]struct{} + attr := pdata.NewAttributeMap() + attr.InsertString("host.name", "test") + attr.InsertString("host.id", "test") + + droppedAttributes := filterAttributes(attr, m) + + _, ok := attr.Get("host.name") + assert.True(t, ok) + + _, ok = attr.Get("host.id") + assert.True(t, ok) + + assert.Equal(t, len(droppedAttributes), 0) +} + +func TestFilterAttributes_NoAttributes(t *testing.T) { + m := make(map[string]struct{}) + attr := pdata.NewAttributeMap() + attr.InsertString("host.name", "test") + attr.InsertString("host.id", "test") + + droppedAttributes := filterAttributes(attr, m) + + _, ok := attr.Get("host.name") + assert.True(t, ok) + + _, ok = attr.Get("host.id") + assert.True(t, ok) + + assert.Equal(t, len(droppedAttributes), 0) +} + func TestAttributesToMap(t *testing.T) { m := map[string]interface{}{ "str": "a", diff --git a/processor/resourcedetectionprocessor/testdata/config.yaml b/processor/resourcedetectionprocessor/testdata/config.yaml index f43e409f56f2..3b591fb74fbd 100644 --- a/processor/resourcedetectionprocessor/testdata/config.yaml +++ b/processor/resourcedetectionprocessor/testdata/config.yaml @@ -25,6 +25,7 @@ processors: override: false system: hostname_sources: [os] + attributes: ["a", "b"] resourcedetection/docker: detectors: [env, docker] timeout: 2s