Skip to content

Commit

Permalink
add properties.discovery.yaml support
Browse files Browse the repository at this point in the history
  • Loading branch information
rmfitzpatrick committed Mar 17, 2023
1 parent 14cbbc6 commit f587a65
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 48 deletions.
48 changes: 47 additions & 1 deletion internal/confmapprovider/discovery/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,50 @@ successfully started observers.
1. Stop all temporary components before continuing on to the actual Collector service (or exiting early with `--dry-run`).


By default, the Discovery mode is provided with pre-made discovery config components in [`bundle.d`](./bundle/README.md).
By default, the Discovery mode is provided with pre-made discovery config components in [`bundle.d`](./bundle/README.md).


### Discovery properties

Configuring discovery components is performed by merging discovery properties with the config.d receivers
and extensions `*.discovery.yaml` files. Discovery properties are of the form:

```yaml
splunk.discovery.receivers.<receiver-type(/name)>.config.<field>(<::subfield>)*: <value>
splunk.discovery.extensions.<observer-type(/name)>.config.<field>(<::subfield>)*: <value>
splunk.discovery.receivers.<receiver-type(/name)>.enabled.: <true or false>
splunk.discovery.extensions.<observer-type(/name)>.enabled: <true or false>

# Examples
splunk.discovery.receivers.prometheus_simple.config.labels::my_label: my_label_value
splunk.discovery.receivers.prometheus_simple.enabled: true

splunk.discovery.extensions.docker_observer.config.endpoint: tcp://localhost:8080
splunk.discovery.extensions.k8s_observer.enabled: false
```
These properties can be in `config.d/properties.discovery.yaml` or specified at run time with `--set` command line options.

Each discovery property also has an equivalent environment variable form using `_x<hex pair>_` encoded delimiters for
non-word characters `[^a-zA-Z0-9_]`:

```bash
SPLUNK_DISCOVERY_RECEIVERS_receiver_x2d_type_x2f_receiver_x2d_name_CONFIG_field_x3a__x3a_subfield=value
SPLUNK_DISCOVERY_EXTENSIONS_observer_x2d_type_x2f_observer_x2d_name_CONFIG_field_x3a__x3a_subfield=value
SPLUNK_DISCOVERY_RECEIVERS_receiver_x2d_type_x2f_receiver_x2d_name_ENABLED=<true or false>
SPLUNK_DISCOVERY_EXTENSIONS_observer_x2d_type_x2f_observer_x2d_name_ENABLED=<true or false>
# Examples
SPLUNK_DISCOVERY_RECEIVERS_prometheus_simple_CONFIG_labels_x3a__x3a_my_label="my_username"
SPLUNK_DISCOVERY_RECEIVERS_prometheus_simple_ENABLED=true
SPLUNK_DISCOVERY_EXTENSIONS_docker_observer_CONFIG_endpoint="tcp://localhost:8080"
SPLUNK_DISCOVERY_EXTENSIONS_k8s_observer_ENABLED=false
```

The priority order for discovery config content from lowest to highest is:

1. `config.d/<receivers or extensions>/*.discovery.yaml` file content (lowest).
2. `config.d/properties.discovery.yaml` file content.
3. `SPLUNK_DISCOVERY_<xyz>` environment variables available to the collector process.
4. `--set splunk.discovery.<xyz>` commandline options (highest).
105 changes: 72 additions & 33 deletions internal/confmapprovider/discovery/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,22 @@ import (
)

const (
typeService = "service"
typeReceiver = "receiver"
typeExporter = "exporter"
typeExtension = "extension"
typeProcessor = "processor"
typeDiscoveryObserver = "discovery.extension"
typeReceiverToDiscover = "discovery.receiver"
typeService = "service"
typeReceiver = "receiver"
typeExporter = "exporter"
typeExtension = "extension"
typeProcessor = "processor"
typeDiscoveryObserver = "discovery.extension"
typeReceiverToDiscover = "discovery.receiver"
typeDiscoveryProperties = "discovery.properties"
)

var (
defaultType = component.NewID("default")

discoveryDirRegex = fmt.Sprintf("[^%s]*", compilablePathSeparator)
serviceEntryRegex = regexp.MustCompile(fmt.Sprintf("%s%s*service\\.(yaml|yml)$", discoveryDirRegex, compilablePathSeparator))
configDirRootRegex = fmt.Sprintf("[^%s]*", compilablePathSeparator)
serviceEntryRegex = regexp.MustCompile(fmt.Sprintf("%s%s?service\\.(yaml|yml)$", configDirRootRegex, compilablePathSeparator))
discoveryPropertiesEntryRegex = regexp.MustCompile(fmt.Sprintf("%s%s?properties\\.discovery\\.(yaml|yml)$", configDirRootRegex, compilablePathSeparator))

_, exporterEntryRegex = dirAndEntryRegex("exporters")
extensionsDirRegex, extensionEntryRegex = dirAndEntryRegex("extensions")
Expand All @@ -57,22 +59,22 @@ var (
)

// Config is a model for stitching together the final Collector configuration with additional discovery component
// fields for use w/ discovery mode (not yet implemented). It allows individual yaml files to be added to a config.d
// directory and be sourced in the final config such that small changes don't require a central configuration file,
// and possible eliminates the need for one overall (still in design).
// fields for use w/ discovery mode. It allows individual yaml files to be added to a config.d directory and
// be sourced in the final config such that small changes don't apply to a central configuration file,
// and possibly eliminates the need for one overall (still in design and dependent on aliasing and array insertion operators).
type Config struct {
logger *zap.Logger
// Service is for pipelines and final settings
// Service is for pipelines and final settings.
// It must be in the root config directory and named "service.yaml"
Service ServiceEntry
// Exporters is a map of exporters to use in final config.
// They must be in `config.d/exporters` directory.
Exporters map[component.ID]ExporterEntry
// Extensions is a map of extensions to use in final config.
// They must be in `config.d/extensions` directory.
Extensions map[component.ID]ExtensionEntry
// DiscoveryObservers is a map of observer extensions to use in discovery,
// overriding the default settings. They must be in `config.d/extensions` directory
// and end with ".discovery.yaml".
// DiscoveryObservers is a map of observer extensions to use in discovery.
// They must be in `config.d/extensions` directory and end with ".discovery.yaml".
DiscoveryObservers map[component.ID]ExtensionEntry
// Processors is a map of extensions to use in final config.
// They must be in `config.d/processors` directory.
Expand All @@ -84,6 +86,10 @@ type Config struct {
// underlying discovery receiver. They must be in `config.d/receivers` directory and
// end with ".discovery.yaml".
ReceiversToDiscover map[component.ID]ReceiverToDiscoverEntry
// DiscoveryProperties is a mapping of discovery properties to their values for
// configuring discovery mode components.
// It must be in the root config directory and named "properties.discovery.yaml".
DiscoveryProperties PropertiesEntry
}

func NewConfig(logger *zap.Logger) *Config {
Expand All @@ -96,11 +102,12 @@ func NewConfig(logger *zap.Logger) *Config {
Processors: map[component.ID]ProcessorEntry{},
Receivers: map[component.ID]ReceiverEntry{},
ReceiversToDiscover: map[component.ID]ReceiverToDiscoverEntry{},
DiscoveryProperties: PropertiesEntry{Entry{}},
}
}

func dirAndEntryRegex(dirName string) (*regexp.Regexp, *regexp.Regexp) {
dirRegex := regexp.MustCompile(fmt.Sprintf("%s%s*%s", discoveryDirRegex, compilablePathSeparator, dirName))
dirRegex := regexp.MustCompile(fmt.Sprintf("%s%s*%s", configDirRootRegex, compilablePathSeparator, dirName))
entryRegex := regexp.MustCompile(fmt.Sprintf("%s%s[^%s]*\\.(yaml|yml)$", dirRegex, compilablePathSeparator, compilablePathSeparator))
return dirRegex, entryRegex
}
Expand Down Expand Up @@ -210,6 +217,16 @@ func (ReceiverToDiscoverEntry) ErrorF(path string, err error) error {
return errorF(typeReceiverToDiscover, path, err)
}

var _ entryType = (*PropertiesEntry)(nil)

type PropertiesEntry struct {
Entry `yaml:",inline"`
}

func (PropertiesEntry) ErrorF(path string, err error) error {
return errorF(typeDiscoveryProperties, path, err)
}

// Load will walk the file tree from the configDPath root, loading the component
// files as they are discovered, determined by their parent directory and filename.
func (c *Config) Load(configDPath string) error {
Expand Down Expand Up @@ -237,6 +254,11 @@ func (c *Config) LoadFS(dirfs fs.FS) error {
// and unmarshal to the underlying ServiceEntry
tmpSEMap := map[string]ServiceEntry{typeService: c.Service}
return loadEntry(typeService, dirfs, path, tmpSEMap)
case isDiscoveryPropertiesEntryPath(path):
// c.DiscoveryProperties is not a map[string]PropertiesEntry, so we form a tmp
// and unmarshal to the underlying PropertiesEntry
tmpDPMap := map[string]PropertiesEntry{typeDiscoveryProperties: c.DiscoveryProperties}
return loadEntry(typeDiscoveryProperties, dirfs, path, tmpDPMap)
case isExporterEntryPath(path):
return loadEntry(typeExporter, dirfs, path, c.Exporters)
case isExtensionEntryPath(path):
Expand Down Expand Up @@ -265,6 +287,7 @@ func (c *Config) LoadFS(dirfs fs.FS) error {
c.Exporters = nil
c.Processors = nil
c.Extensions = nil
c.DiscoveryProperties = PropertiesEntry{nil}
}
return err
}
Expand Down Expand Up @@ -335,6 +358,10 @@ func isReceiverToDiscoverEntryPath(path string) bool {
return receiverToDiscoverEntryRegex.MatchString(path)
}

func isDiscoveryPropertiesEntryPath(path string) bool {
return discoveryPropertiesEntryRegex.MatchString(path)
}

func loadEntry[K keyType, V entryType](componentType string, fs fs.FS, path string, target map[K]V) error {
tmpDest := map[K]V{}

Expand All @@ -351,16 +378,17 @@ func loadEntry[K keyType, V entryType](componentType string, fs fs.FS, path stri
return nil
}

if componentType == typeService {
// Shallow entry case where resulting entry is not a map[component.ID]Entry
if componentType == typeService || componentType == typeDiscoveryProperties {
// set directly on target and exit
typeServiceK, err := stringToKeyType(componentType, componentID)
typeShallowK, err := stringToKeyType(componentType, componentID)
if err != nil {
return err
}
serviceEntry := target[typeServiceK].Self()
tmpDstSM := tmpDest[typeServiceK].ToStringMap()
shallowEntry := target[typeShallowK].Self()
tmpDstSM := tmpDest[typeShallowK].ToStringMap()
for k, v := range tmpDstSM {
serviceEntry[keyTypeToString(k)] = v
shallowEntry[keyTypeToString(k)] = v
}
return nil
}
Expand All @@ -381,11 +409,14 @@ func unmarshalEntry[K keyType, V entryType](componentType string, fs fs.FS, path

var unmarshalDst any = dst

// service is map[string]ServiceEntry{typeService: ServiceEntry} and we want dst to be &ServiceEntry
if componentType == typeService {
var s any = typeService
// service key is always string so this type assertion is safe
se := (*dst)[s.(K)]
// shallow cases where dst is map[string]Entry{<typeEntry>: <Entry>} but we want it to be &<Entry>
if componentType == typeService || componentType == typeDiscoveryProperties {
var shallowType any = typeService
if componentType == typeDiscoveryProperties {
shallowType = typeDiscoveryProperties
}
// key is always string so this type assertion is safe
se := (*dst)[shallowType.(K)]
unmarshalDst = &se
}

Expand All @@ -394,12 +425,20 @@ func unmarshalEntry[K keyType, V entryType](componentType string, fs fs.FS, path
return
}

if componentType == typeService {
// reset map[string]ServiceEntry dst w/ unmarshalled ServiceEntry and return
var tService any = typeService
var serviceEntry any = *(unmarshalDst.(*ServiceEntry))
(*dst)[tService.(K)] = serviceEntry.(V)
return tService.(K), nil
if componentType == typeService || componentType == typeDiscoveryProperties {
var shallowType any
var entry any
// reset map[string]<EntryType> dst w/ unmarshalled Entry and return
switch componentType {
case typeService:
shallowType = typeService
entry = *(unmarshalDst.(*ServiceEntry))
case typeDiscoveryProperties:
shallowType = typeDiscoveryProperties
entry = *(unmarshalDst.(*PropertiesEntry))
}
(*dst)[shallowType.(K)] = entry.(V)
return shallowType.(K), nil
}

entry := *(unmarshalDst.(*map[K]V))
Expand Down
40 changes: 31 additions & 9 deletions internal/confmapprovider/discovery/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,12 @@ var expectedConfig = Config{
},
},
},
DiscoveryProperties: PropertiesEntry{
Entry: map[string]any{
"splunk.discovery.receivers.smartagent/postgresql.config.params.unused_param": "param_value",
"splunk.discovery.extensions.docker_observer.config.timeout": "1s",
},
},
}

func TestConfig(t *testing.T) {
Expand Down Expand Up @@ -330,13 +336,29 @@ func TestToServiceConfig(t *testing.T) {
require.Equal(t, expectedServiceConfig, sc)
}

func TestConfigWithTwoReceiversInOneFile(t *testing.T) {
configDir := filepath.Join(".", "testdata", "double-receiver-item-config.d")
logger := zap.NewNop()
cfg := NewConfig(logger)
require.NotNil(t, cfg)
err := cfg.Load(configDir)
require.Contains(t, err.Error(), "must contain a single mapping of ComponentID to component but contained [otlp otlp/disallowed]")
cfg.logger = nil // unset for equality check
require.Equal(t, Config{}, *cfg)
func TestInvalidConfigDirContents(t *testing.T) {
for _, test := range []struct {
configDir string
expectedError string
}{
{
configDir: "double-receiver-item-config.d",
expectedError: "must contain a single mapping of ComponentID to component but contained [otlp otlp/disallowed]",
},
{
configDir: "invalid-properties.d",
expectedError: "failed loading discovery.properties from properties.discovery.yaml: failed unmarshalling component discovery.properties: failed parsing \"properties.discovery.yaml\" as yaml",
},
} {
t.Run(test.configDir, func(t *testing.T) {
configDir := filepath.Join(".", "testdata", test.configDir)
logger := zap.NewNop()
cfg := NewConfig(logger)
require.NotNil(t, cfg)
err := cfg.Load(configDir)
require.Contains(t, err.Error(), test.expectedError)
cfg.logger = nil // unset for equality check
require.Equal(t, Config{}, *cfg)
})
}
}
23 changes: 23 additions & 0 deletions internal/confmapprovider/discovery/discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func (d *discoverer) propertiesConfFromEnv() *confmap.Conf {
// discover will create all .discovery.yaml components, start them, wait the configured
// duration, and tear them down before returning the discovery config.
func (d *discoverer) discover(cfg *Config) (map[string]any, error) {
if err := d.mergeDiscoveryPropertiesEntry(cfg); err != nil {
return nil, fmt.Errorf("failed reconciling properties.discovery: %w", err)
}
discoveryReceivers, discoveryObservers, err := d.createDiscoveryReceiversAndObservers(cfg)
if err != nil {
d.logger.Error("failed preparing discovery components", zap.Error(err))
Expand Down Expand Up @@ -682,6 +685,26 @@ func (d *discoverer) ConsumeLogs(_ context.Context, ld plog.Logs) error {
return nil
}

// mergeDiscoveryPropertiesEntry validates and merges properties.discovery.yaml content with existing sources.
// Priority is discovery.properties.yaml < env var properties < --set properties. --set and env var properties
// are already resolved at this point.
func (d *discoverer) mergeDiscoveryPropertiesEntry(cfg *Config) error {
props := map[string]any{}
for k, v := range cfg.DiscoveryProperties.Entry {
if prop, err := properties.NewProperty(k, fmt.Sprintf("%s", v)); err != nil {
d.logger.Warn(fmt.Sprintf("invalid discovery property %q", k), zap.Error(err))
} else {
mergeMaps(props, prop.ToStringMap())
}
}
fileProps := confmap.NewFromStringMap(props)
if err := fileProps.Merge(d.propertiesConf); err != nil {
return err
}
d.propertiesConf = fileProps
return nil
}

func determineCurrentStatus(current, observed discovery.StatusType) discovery.StatusType {
switch {
case current == discovery.Successful:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
splunk.discovery.receivers.smartagent/postgresql.config.params.unused_param: param_value
splunk.discovery.extensions.docker_observer.config.timeout: 1s
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
invalid
Loading

0 comments on commit f587a65

Please sign in to comment.