Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/resourcedetectionprocessor] Add attribute allowlist to Resource Detection Processor #8547

Merged
merged 11 commits into from
Mar 22, 2022
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- `resourcedetectionprocessor`: Add attribute allowlist (#8547)

## v0.47.0

### 💡 Enhancements 💡
Expand Down
2 changes: 2 additions & 0 deletions processor/resourcedetectionprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ processors:
detectors: [ <string> ]
# determines if existing resource attributes should be overridden or preserved, defaults to true
override: <bool>
# When included, only attributes in the list will be appened. Applies to all detectors.
attributes: [ <string> ]
```

## Ordering
Expand Down
3 changes: 3 additions & 0 deletions processor/resourcedetectionprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type Config struct {
// HTTP client settings for the detector
// Timeout default is 5s
confighttp.HTTPClientSettings `mapstructure:",squash"`
// Attributes is an allowlist of attributes to add.
// If a supplied attribute is not a valid atrtibute of a supplied detector it will be ignored.
Attributes []string `mapstructure:"attributes"`
}

// DetectorConfig contains user-specified configurations unique to all individual detectors
Expand Down
1 change: 1 addition & 0 deletions processor/resourcedetectionprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func TestLoadConfig(t *testing.T) {
},
HTTPClientSettings: confighttp.HTTPClientSettings{Timeout: 2 * time.Second, MaxIdleConns: p2.MaxIdleConns, IdleConnTimeout: p2.IdleConnTimeout},
Override: false,
Attributes: []string{"a", "b"},
}
assert.Equal(t, p4, p4e)
}
Expand Down
6 changes: 4 additions & 2 deletions processor/resourcedetectionprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func createDefaultConfig() config.Processor {
Detectors: []string{env.TypeStr},
HTTPClientSettings: defaultHTTPClientSettings(),
Override: true,
Attributes: nil,
// TODO: Once issue(https://github.com/open-telemetry/opentelemetry-collector/issues/4001) gets resolved,
// Set the default value of 'hostname_source' here instead of 'system' detector
}
Expand Down Expand Up @@ -170,7 +171,7 @@ func (f *factory) getResourceDetectionProcessor(
) (*resourceDetectionProcessor, error) {
oCfg := cfg.(*Config)

provider, err := f.getResourceProvider(params, cfg.ID(), oCfg.HTTPClientSettings.Timeout, oCfg.Detectors, oCfg.DetectorConfig)
provider, err := f.getResourceProvider(params, cfg.ID(), oCfg.HTTPClientSettings.Timeout, oCfg.Detectors, oCfg.DetectorConfig, oCfg.Attributes)
if err != nil {
return nil, err
}
Expand All @@ -189,6 +190,7 @@ func (f *factory) getResourceProvider(
timeout time.Duration,
configuredDetectors []string,
detectorConfigs DetectorConfig,
attributes []string,
) (*internal.ResourceProvider, error) {
f.lock.Lock()
defer f.lock.Unlock()
Expand All @@ -202,7 +204,7 @@ func (f *factory) getResourceProvider(
detectorTypes = append(detectorTypes, internal.DetectorType(strings.TrimSpace(key)))
}

provider, err := f.resourceProviderFactory.CreateResourceProvider(params, timeout, &detectorConfigs, detectorTypes...)
provider, err := f.resourceProviderFactory.CreateResourceProvider(params, timeout, attributes, &detectorConfigs, detectorTypes...)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,22 @@ func NewProviderFactory(detectors map[DetectorType]DetectorFactory) *ResourcePro
func (f *ResourceProviderFactory) CreateResourceProvider(
params component.ProcessorCreateSettings,
timeout time.Duration,
attributes []string,
detectorConfigs ResourceDetectorConfig,
detectorTypes ...DetectorType) (*ResourceProvider, error) {
detectors, err := f.getDetectors(params, detectorConfigs, detectorTypes)
if err != nil {
return nil, err
}

provider := NewResourceProvider(params.Logger, timeout, detectors...)
attributesToKeep := make(map[string]struct{})
if len(attributes) > 0 {
for _, attribute := range attributes {
attributesToKeep[attribute] = struct{}{}
}
}

provider := NewResourceProvider(params.Logger, timeout, attributesToKeep, detectors...)
return provider, nil
}

Expand Down Expand Up @@ -90,6 +98,7 @@ type ResourceProvider struct {
detectors []Detector
detectedResource *resourceResult
once sync.Once
attributesToKeep map[string]struct{}
}

type resourceResult struct {
Expand All @@ -98,11 +107,12 @@ type resourceResult struct {
err error
}

func NewResourceProvider(logger *zap.Logger, timeout time.Duration, detectors ...Detector) *ResourceProvider {
func NewResourceProvider(logger *zap.Logger, timeout time.Duration, attributesToKeep map[string]struct{}, detectors ...Detector) *ResourceProvider {
return &ResourceProvider{
logger: logger,
timeout: timeout,
detectors: detectors,
logger: logger,
timeout: timeout,
detectors: detectors,
attributesToKeep: attributesToKeep,
}
}

Expand Down Expand Up @@ -133,9 +143,10 @@ func (p *ResourceProvider) detectResource(ctx context.Context) {
mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, schemaURL)
MergeResource(res, r, false)
}

}

FilterAttributes(res.Attributes(), p.attributesToKeep)

p.logger.Info("detected resource information", zap.Any("resource", AttributesToMap(res.Attributes())))
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved

p.detectedResource.resource = res
Expand Down Expand Up @@ -194,6 +205,15 @@ func MergeSchemaURL(currentSchemaURL string, newSchemaURL string) string {
return currentSchemaURL
}

func FilterAttributes(am pdata.AttributeMap, attributesToKeep map[string]struct{}) {
if len(attributesToKeep) > 0 {
am.RemoveIf(func(k string, v pdata.AttributeValue) bool {
_, keep := attributesToKeep[k]
return !keep
})
}
}

func MergeResource(to, from pdata.Resource, overrideTo bool) {
if IsEmptyResource(from) {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func TestDetect(t *testing.T) {
name string
detectedResources []pdata.Resource
expectedResource pdata.Resource
attributes []string
}{
{
name: "Detect three resources",
Expand All @@ -61,6 +62,7 @@ func TestDetect(t *testing.T) {
NewResource(map[string]interface{}{"a": "12", "c": "3"}),
},
expectedResource: NewResource(map[string]interface{}{"a": "1", "b": "2", "c": "3"}),
attributes: nil,
}, {
name: "Detect empty resources",
detectedResources: []pdata.Resource{
Expand All @@ -69,6 +71,7 @@ func TestDetect(t *testing.T) {
NewResource(map[string]interface{}{"a": "11"}),
},
expectedResource: NewResource(map[string]interface{}{"a": "1", "b": "2"}),
attributes: nil,
}, {
name: "Detect non-string resources",
detectedResources: []pdata.Resource{
Expand All @@ -77,6 +80,16 @@ func TestDetect(t *testing.T) {
NewResource(map[string]interface{}{"a": "11"}),
},
expectedResource: NewResource(map[string]interface{}{"a": "11", "bool": true, "int": int64(2), "double": 0.5}),
attributes: nil,
}, {
name: "Filter to one attribute",
detectedResources: []pdata.Resource{
NewResource(map[string]interface{}{"a": "1", "b": "2"}),
NewResource(map[string]interface{}{"a": "11", "c": "3"}),
NewResource(map[string]interface{}{"a": "12", "c": "3"}),
},
expectedResource: NewResource(map[string]interface{}{"a": "1"}),
attributes: []string{"a"},
},
}

Expand All @@ -97,7 +110,7 @@ func TestDetect(t *testing.T) {
}

f := NewProviderFactory(mockDetectors)
p, err := f.CreateResourceProvider(componenttest.NewNopProcessorCreateSettings(), time.Second, &mockDetectorConfig{}, mockDetectorTypes...)
p, err := f.CreateResourceProvider(componenttest.NewNopProcessorCreateSettings(), time.Second, tt.attributes, &mockDetectorConfig{}, mockDetectorTypes...)
require.NoError(t, err)

got, _, err := p.Get(context.Background(), http.DefaultClient)
Expand All @@ -113,7 +126,7 @@ func TestDetect(t *testing.T) {
func TestDetectResource_InvalidDetectorType(t *testing.T) {
mockDetectorKey := DetectorType("mock")
p := NewProviderFactory(map[DetectorType]DetectorFactory{})
_, err := p.CreateResourceProvider(componenttest.NewNopProcessorCreateSettings(), time.Second, &mockDetectorConfig{}, mockDetectorKey)
_, err := p.CreateResourceProvider(componenttest.NewNopProcessorCreateSettings(), time.Second, nil, &mockDetectorConfig{}, mockDetectorKey)
require.EqualError(t, err, fmt.Sprintf("invalid detector key: %v", mockDetectorKey))
}

Expand All @@ -124,7 +137,7 @@ func TestDetectResource_DetectoryFactoryError(t *testing.T) {
return nil, errors.New("creation failed")
},
})
_, err := p.CreateResourceProvider(componenttest.NewNopProcessorCreateSettings(), time.Second, &mockDetectorConfig{}, mockDetectorKey)
_, err := p.CreateResourceProvider(componenttest.NewNopProcessorCreateSettings(), time.Second, nil, &mockDetectorConfig{}, mockDetectorKey)
require.EqualError(t, err, fmt.Sprintf("failed creating detector type %q: %v", mockDetectorKey, "creation failed"))
}

Expand All @@ -135,7 +148,7 @@ func TestDetectResource_Error(t *testing.T) {
md2 := &MockDetector{}
md2.On("Detect").Return(pdata.NewResource(), errors.New("err1"))

p := NewResourceProvider(zap.NewNop(), time.Second, md1, md2)
p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2)
_, _, err := p.Get(context.Background(), http.DefaultClient)
require.NoError(t, err)
}
Expand Down Expand Up @@ -205,7 +218,7 @@ func TestDetectResource_Parallel(t *testing.T) {
expectedResource := NewResource(map[string]interface{}{"a": "1", "b": "2", "c": "3"})
expectedResource.Attributes().Sort()

p := NewResourceProvider(zap.NewNop(), time.Second, md1, md2, md3)
p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2, md3)

// call p.Get multiple times
wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -235,6 +248,75 @@ func TestDetectResource_Parallel(t *testing.T) {
md3.AssertNumberOfCalls(t, "Detect", 1)
}

func TestFilterAttributes_Match(t *testing.T) {
m := map[string]struct{}{
"host.name": {},
"host.id": {},
}
attr := pdata.NewAttributeMap()
attr.InsertString("host.name", "test")
attr.InsertString("host.id", "test")
attr.InsertString("drop.this", "test")

FilterAttributes(attr, m)

_, ok := attr.Get("host.name")
assert.True(t, ok)

_, ok = attr.Get("host.id")
assert.True(t, ok)

_, ok = attr.Get("drop.this")
assert.False(t, ok)
}

func TestFilterAttributes_NoMatch(t *testing.T) {
m := map[string]struct{}{
"cloud.region": {},
}
attr := pdata.NewAttributeMap()
attr.InsertString("host.name", "test")
attr.InsertString("host.id", "test")

FilterAttributes(attr, m)

_, ok := attr.Get("host.name")
assert.False(t, ok)

_, ok = attr.Get("host.id")
assert.False(t, ok)
}

func TestFilterAttributes_NilAttributes(t *testing.T) {
var m map[string]struct{}
attr := pdata.NewAttributeMap()
attr.InsertString("host.name", "test")
attr.InsertString("host.id", "test")

FilterAttributes(attr, m)

_, ok := attr.Get("host.name")
assert.True(t, ok)

_, ok = attr.Get("host.id")
assert.True(t, ok)
}

func TestFilterAttributes_NoAttributes(t *testing.T) {
m := make(map[string]struct{})
attr := pdata.NewAttributeMap()
attr.InsertString("host.name", "test")
attr.InsertString("host.id", "test")

FilterAttributes(attr, m)

_, ok := attr.Get("host.name")
assert.True(t, ok)

_, ok = attr.Get("host.id")
assert.True(t, ok)
}

func TestAttributesToMap(t *testing.T) {
m := map[string]interface{}{
"str": "a",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ processors:
override: false
system:
hostname_sources: [os]
attributes: ["a", "b"]
resourcedetection/docker:
detectors: [env, docker]
timeout: 2s
Expand Down