Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #28868 to 7.17: Enhance add_kubernetes_metadata matcher #30526

Merged
merged 2 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Filebeat*

- Add support for '/var/log/pods/' path for add_kubernetes_metadata processor with `resource_type: pod`. {pull}28868[28868]
- Add documentation for add_kubernetes_metadata processors `log_path` matcher. {pull}28868[28868]

*Heartbeat*


Expand Down
100 changes: 63 additions & 37 deletions filebeat/processor/add_kubernetes_metadata/matchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,59 +78,78 @@ func newLogsPathMatcher(cfg common.Config) (add_kubernetes_metadata.Matcher, err
// Docker container ID is a 64-character-long hexadecimal string
const containerIdLen = 64

// Pod UID is on the 5th index of the path directories
const podUIDPos = 5

func (f *LogPathMatcher) MetadataIndex(event common.MapStr) string {
value, err := event.GetValue("log.file.path")
if err == nil {
source := value.(string)
f.logger.Debugf("Incoming log.file.path value: %s", source)
if err != nil {
f.logger.Debugf("Error extracting log.file.path from the event: %s.", event)
return ""
}

if !strings.Contains(source, f.LogsPath) {
f.logger.Errorf("Error extracting container id - source value does not contain matcher's logs_path '%s'.", f.LogsPath)
return ""
}
source := value.(string)
f.logger.Debugf("Incoming log.file.path value: %s", source)

if !strings.Contains(source, f.LogsPath) {
f.logger.Errorf("Error extracting container id - source value does not contain matcher's logs_path '%s'.", f.LogsPath)
return ""
}

sourceLen := len(source)
logsPathLen := len(f.LogsPath)
sourceLen := len(source)
logsPathLen := len(f.LogsPath)

if f.ResourceType == "pod" {
// Specify a pod resource type when manually mounting log volumes and they end up under "/var/lib/kubelet/pods/"
// This will extract only the pod UID, which offers less granularity of metadata when compared to the container ID
if strings.HasPrefix(f.LogsPath, podLogsPath()) && strings.HasSuffix(source, ".log") {
if f.ResourceType == "pod" {
// Pod resource type will extract only the pod UID, which offers less granularity of metadata when compared to the container ID
if strings.HasSuffix(source, ".log") {
// Specify a pod resource type when writting logs into manually mounted log volume,
// those logs apper under under "/var/lib/kubelet/pods/<pod_id>/volumes/..."
if strings.HasPrefix(f.LogsPath, podKubeletLogsPath()) {
pathDirs := strings.Split(source, pathSeparator)
podUIDPos := 5
if len(pathDirs) > podUIDPos {
podUID := strings.Split(source, pathSeparator)[podUIDPos]

f.logger.Debugf("Using pod uid: %s", podUID)
return podUID
}

f.logger.Error("Error extracting pod uid - source value contains matcher's logs_path, however it is too short to contain a Pod UID.")
}
} else {
// In case of the Kubernetes log path "/var/log/containers/",
// the container ID will be located right before the ".log" extension.
if strings.HasPrefix(f.LogsPath, containerLogsPath()) && strings.HasSuffix(source, ".log") && sourceLen >= containerIdLen+4 {
containerIDEnd := sourceLen - 4
cid := source[containerIDEnd-containerIdLen : containerIDEnd]
f.logger.Debugf("Using container id: %s", cid)
return cid
}

// In any other case, we assume the container ID will follow right after the log path.
// However we need to check the length to prevent "slice bound out of range" runtime errors.
if sourceLen >= logsPathLen+containerIdLen {
cid := source[logsPathLen : logsPathLen+containerIdLen]
f.logger.Debugf("Using container id: %s", cid)
return cid
// In case of the Kubernetes log path "/var/log/pods/",
// the pod ID will be extracted from the directory name,
// file name example: "/var/log/pods/'<namespace>_<pod_name>_<pod_uid>'/container_name/0.log".
if strings.HasPrefix(f.LogsPath, podLogsPath()) {
pathDirs := strings.Split(source, pathSeparator)
podUIDPos := 4
if len(pathDirs) > podUIDPos {
podUID := strings.Split(pathDirs[podUIDPos], "_")
if len(podUID) > 2 {
f.logger.Debugf("Using pod uid: %s", podUID[2])
return podUID[2]
}
}
}

f.logger.Error("Error extracting container id - source value contains matcher's logs_path, however it is too short to contain a Docker container ID.")
f.logger.Error("Error extracting pod uid - source value does not contains matcher's logs_path")
return ""
}
}
// In case of the Kubernetes log path "/var/log/containers/",
// the container ID will be located right before the ".log" extension.
// file name example: /var/log/containers/<pod_name>_<namespace>_<container_name>-<continer_id>.log
if strings.HasPrefix(f.LogsPath, containerLogsPath()) && strings.HasSuffix(source, ".log") && sourceLen >= containerIdLen+4 {
containerIDEnd := sourceLen - 4
cid := source[containerIDEnd-containerIdLen : containerIDEnd]
f.logger.Debugf("Using container id: %s", cid)
return cid
}

// In any other case, we assume the container ID will follow right after the log path.
// However we need to check the length to prevent "slice bound out of range" runtime errors.
// for the default log path /var/lib/docker/containers/ container ID will follow right after the log path.
// file name example: /var/lib/docker/containers/<container_id>/<container_id>-json.log
if sourceLen >= logsPathLen+containerIdLen {
cid := source[logsPathLen : logsPathLen+containerIdLen]
f.logger.Debugf("Using container id: %s", cid)
return cid
}

f.logger.Error("Error extracting container id - source value contains matcher's logs_path, however it is too short to contain a Docker container ID.")
return ""
}

Expand All @@ -141,13 +160,20 @@ func defaultLogPath() string {
return "/var/lib/docker/containers/"
}

func podLogsPath() string {
func podKubeletLogsPath() string {
if runtime.GOOS == "windows" {
return "C:\\var\\lib\\kubelet\\pods\\"
}
return "/var/lib/kubelet/pods/"
}

func podLogsPath() string {
if runtime.GOOS == "windows" {
return "C:\\var\\log\\pods\\"
}
return "/var/log/pods/"
}

func containerLogsPath() string {
if runtime.GOOS == "windows" {
return "C:\\var\\log\\containers\\"
Expand Down
30 changes: 30 additions & 0 deletions filebeat/processor/add_kubernetes_metadata/matchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,36 @@ func TestLogsPathMatcher_InvalidSource4(t *testing.T) {
executeTestWithResourceType(t, cfgLogsPath, cfgResourceType, source, expectedResult)
}

func TestLogsPathMatcher_InvalidVarLogPodSource(t *testing.T) {
cfgLogsPath := "/var/log/pods/"
cfgResourceType := "pod"
source := fmt.Sprintf("/invalid/dir/namespace_pod-name_%s/container/0.log", puid)
expectedResult := ""
executeTestWithResourceType(t, cfgLogsPath, cfgResourceType, source, expectedResult)
}

func TestLogsPathMatcher_InvalidVarLogPodIDFormat(t *testing.T) {
cfgLogsPath := "/var/log/pods/"
cfgResourceType := "pod"
source := fmt.Sprintf("/var/log/pods/%s/container/0.log", puid)
expectedResult := ""
executeTestWithResourceType(t, cfgLogsPath, cfgResourceType, source, expectedResult)
}

func TestLogsPathMatcher_ValidVarLogPod(t *testing.T) {
cfgLogsPath := "/var/log/pods/"
cfgResourceType := "pod"
sourcePath := "/var/log/pods/namespace_pod-name_%s/container/0.log"

if runtime.GOOS == "windows" {
cfgLogsPath = "C:\\var\\log\\pods\\"
sourcePath = "C:\\var\\log\\pods\\namespace_pod-name_%s\\container\\0.log"
}
source := fmt.Sprintf(sourcePath, puid)
expectedResult := puid
executeTestWithResourceType(t, cfgLogsPath, cfgResourceType, source, expectedResult)
}

func executeTest(t *testing.T, cfgLogsPath string, source string, expectedResult string) {
executeTestWithResourceType(t, cfgLogsPath, "", source, expectedResult)
}
Expand Down
32 changes: 32 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,37 @@ func (k *kubeAnnotatorConfig) Validate() error {
k.Host = ""
}

// Checks below were added to warn the users early on and avoid initialising the processor in case the `logs_path`
// matcher config is not valid: supported paths defined as a `logs_path` configuration setting are strictly defined
// if `resource_type` is set
for _, matcher := range k.Matchers {
if matcherCfg, ok := matcher["logs_path"]; ok {
if matcherCfg.HasField("resource_type") {
logsPathMatcher := struct {
LogsPath string `config:"logs_path"`
ResourceType string `config:"resource_type"`
}{}

err := matcherCfg.Unpack(&logsPathMatcher)
if err != nil {
return fmt.Errorf("fail to unpack the `logs_path` matcher configuration: %s", err)
}
if logsPathMatcher.LogsPath == "" {
return fmt.Errorf("invalid logs_path matcher configuration: when resource_type is defined, logs_path must be set as well")
}
if logsPathMatcher.ResourceType != "pod" && logsPathMatcher.ResourceType != "container" {
return fmt.Errorf("invalid resource_type %s, valid values include `pod`, `container`", logsPathMatcher.ResourceType)
}
if logsPathMatcher.ResourceType == "pod" && !(logsPathMatcher.LogsPath == "/var/lib/kubelet/pods/" || logsPathMatcher.LogsPath == "/var/log/pods/") {
return fmt.Errorf("invalid logs_path defined for resource_type: %s, valid values include `/var/lib/kubelet/pods/`, `/var/log/pods/`", logsPathMatcher.ResourceType)
}
if logsPathMatcher.ResourceType == "container" && logsPathMatcher.LogsPath != "/var/log/containers/" {
return fmt.Errorf("invalid logs_path defined for resource_type: %s, valid value is `/var/log/containers/`", logsPathMatcher.ResourceType)
}
}

}
}

return nil
}
73 changes: 73 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,76 @@ func TestConfigValidate(t *testing.T) {
}
}
}

func TestConfigValidate_LogsPatchMatcher(t *testing.T) {
tests := []struct {
matcherName string
matcherConfig map[string]interface{}
error bool
}{
{
matcherName: "",
matcherConfig: map[string]interface{}{},
error: false,
},
{
matcherName: "logs_path",
matcherConfig: map[string]interface{}{
"resource_type": "pod",
},
error: true,
},
{
matcherName: "logs_path",
matcherConfig: map[string]interface{}{
"resource_type": "pod",
"invalid_field": "invalid_value",
},
error: true,
},
{
matcherName: "logs_path",
matcherConfig: map[string]interface{}{
"resource_type": "pod",
"logs_path": "/var/log/invalid/path/",
},
error: true,
},
{
matcherName: "logs_path",
matcherConfig: map[string]interface{}{
"resource_type": "pod",
"logs_path": "/var/log/pods/",
},
error: false,
},
{
matcherName: "logs_path",
matcherConfig: map[string]interface{}{
"resource_type": "container",
"logs_path": "/var/log/containers/",
},
error: false,
},
}

for _, test := range tests {
cfg, _ := common.NewConfigFrom(test.matcherConfig)

c := defaultKubernetesAnnotatorConfig()
c.DefaultMatchers = Enabled{false}

err := cfg.Unpack(&c)
c.Matchers = PluginConfig{
{
test.matcherName: *cfg,
},
}
err = c.Validate()
if test.error {
require.NotNil(t, err)
} else {
require.Nil(t, err)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,29 @@ the `log.file.path` field.
This matcher has the following configuration settings:

`logs_path`:: (Optional) Base path of container logs. If not specified, it uses
the default logs path of the platform where {beatname_uc} is running.
`resource_type`:: (Optional) Type of the resource to obtain the ID of. It can be
`pod`, to make the lookup based on the pod UID, or `container`, to make the
lookup based on the container ID. It defaults to `container`.
the default logs path of the platform where {beatname_uc} is running: for Linux -
`/var/lib/docker/containers/`, Windows - `C:\\ProgramData\\Docker\\containers`.
To change the default value: container ID must follow right after the `logs_path` -
`<log_path>/<container_id>`, where `container_id` is a 64-character-long
hexadecimal string.

`resource_type`:: (Optional) Type of the resource to obtain the ID of.
Valid `resource_type`:
* `pod`: to make the lookup based on the pod UID. When `resource_type` is set to
`pod`, `logs_path` must be set as well, supported path in this case:
** `/var/lib/kubelet/pods/` used to read logs from mounted into the pod volumes,
those logs end up under `/var/lib/kubelet/pods/<pod UID>/volumes/<volume name>/...`
To use `/var/lib/kubelet/pods/` as a `log_path`, `/var/lib/kubelet/pods` must be
mounted into the filebeat Pods.
** `/var/log/pods/`
Note: when using `resource_type: 'pod'` logs will be enriched only with pod
metadata: pod id, pod name, etc., not container metadata.
*`container`: to make the lookup based on the container ID, `logs_path` must
be set to `/var/log/containers/`.
It defaults to `container`.

To be able to use `logs_path` matcher filebeat input path must be a subdirectory
of directory defined in `logs_path` configuration setting.

The default configuration is able to lookup the metadata using the container ID
when the logs are collected from the default docker logs path
Expand Down