From f048c5148b75f5c84a73181d7e6ad954fbf44eed Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Tue, 28 Sep 2021 14:55:54 -0700 Subject: [PATCH] [k8sattributes processor] Add container metadata (#5467) * [k8sattributes processor] Add optional container metadata This change provides an option to fetch container metadata from k8s API in addition to k8s pod metadata. The following attributes now can be automatically added by the k8sattributes processor: - container.image.name - container.image.tag - container.id `container.image.name` and `container.image.tag` require additional container identifier present in resource attributes: `container.name`. `container.id` requires additional container run identifiers present in resource attributes: `container.name` and `run_id`. `run_id` identified is a subject to change, see https://github.com/open-telemetry/opentelemetry-specification/pull/1945 * Make linter happy * Make container attributes enabled by default --- processor/k8sattributesprocessor/doc.go | 24 ++- .../k8sattributesprocessor/kube/client.go | 49 +++++ .../kube/client_test.go | 160 ++++++++++++++ processor/k8sattributesprocessor/kube/kube.go | 34 ++- processor/k8sattributesprocessor/options.go | 9 + processor/k8sattributesprocessor/processor.go | 61 +++++- .../k8sattributesprocessor/processor_test.go | 197 ++++++++++++++++++ 7 files changed, 519 insertions(+), 15 deletions(-) diff --git a/processor/k8sattributesprocessor/doc.go b/processor/k8sattributesprocessor/doc.go index 558a6abcbdb4..af78455787f4 100644 --- a/processor/k8sattributesprocessor/doc.go +++ b/processor/k8sattributesprocessor/doc.go @@ -40,7 +40,29 @@ // // If Pod association rules are not configured resources are associated with metadata only by connection's IP Address. // -// +// Which metadata to collect is determined by `metadata` configuration that defines list of resource attributes +// to be added. Items in the list called exactly the same as the resource attributes that will be added. +// All the available attributes are enabled by default, you can reduce the list with `metadata` configuration. +// The following attributes will be added if pod identified: +// - k8s.namespace.name +// - k8s.pod.name +// - k8s.pod.uid +// - k8s.pod.start_time +// - k8s.deployment.name +// - k8s.cluster.name +// - k8s.node.name +// Not all the attributes are guaranteed to be added. For example `k8s.cluster.name` usually is not provided by k8s API, +// so likely it won't be set as an attribute. + +// The following container level attributes require additional attributes to identify a particular container in a pod: +// 1. Container spec attributes - will be set only if container identifying attribute `container.name` is set +// as a resource attribute (similar to all other attributes, pod has to be identified as well): +// - container.image.name +// - container.image.tag +// 2. Container status attributes - in addition to pod identifier and `container.name` attribute, these attributes +// require identifier of a particular container run set as `run_id` in resource attributes: +// - container.id + //The k8sattributesprocessor can be used for automatic tagging of spans, metrics and logs with k8s labels and annotations from pods and namespaces. //The config for associating the data passing through the processor (spans, metrics and logs) with specific Pod/Namespace annotations/labels is configured via "annotations" and "labels" keys. //This config represents a list of annotations/labels that are extracted from pods/namespaces and added to spans, metrics and logs. diff --git a/processor/k8sattributesprocessor/kube/client.go b/processor/k8sattributesprocessor/kube/client.go index d623525d3b7a..5f2801f61cfb 100644 --- a/processor/k8sattributesprocessor/kube/client.go +++ b/processor/k8sattributesprocessor/kube/client.go @@ -330,6 +330,48 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string { return tags } +func (c *WatchClient) extractPodContainersAttributes(pod *api_v1.Pod) map[string]*Container { + containers := map[string]*Container{} + + if c.Rules.ContainerImageName || c.Rules.ContainerImageTag { + for _, spec := range append(pod.Spec.Containers, pod.Spec.InitContainers...) { + container := &Container{} + imageParts := strings.Split(spec.Image, ":") + if c.Rules.ContainerImageName { + container.ImageName = imageParts[0] + } + if c.Rules.ContainerImageTag && len(imageParts) > 1 { + container.ImageTag = imageParts[1] + } + containers[spec.Name] = container + } + } + + if c.Rules.ContainerID { + for _, apiStatus := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { + container, ok := containers[apiStatus.Name] + if !ok { + container = &Container{} + containers[apiStatus.Name] = container + } + if container.Statuses == nil { + container.Statuses = map[int]ContainerStatus{} + } + + containerID := apiStatus.ContainerID + + // Remove container runtime prefix + idParts := strings.Split(containerID, "://") + if len(idParts) == 2 { + containerID = idParts[1] + } + + container.Statuses[int(apiStatus.RestartCount)] = ContainerStatus{containerID} + } + } + return containers +} + func (c *WatchClient) extractNamespaceAttributes(namespace *api_v1.Namespace) map[string]string { tags := map[string]string{} @@ -378,6 +420,9 @@ func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) { newPod.Ignore = true } else { newPod.Attributes = c.extractPodAttributes(pod) + if needContainerAttributes(c.Rules) { + newPod.Containers = c.extractPodContainersAttributes(pod) + } } c.m.Lock() @@ -513,3 +558,7 @@ func (c *WatchClient) extractNamespaceLabelsAnnotations() bool { return false } + +func needContainerAttributes(rules ExtractionRules) bool { + return rules.ContainerImageName || rules.ContainerImageTag || rules.ContainerID +} diff --git a/processor/k8sattributesprocessor/kube/client_test.go b/processor/k8sattributesprocessor/kube/client_test.go index c8e1ebe5f654..f57686b5bf4e 100644 --- a/processor/k8sattributesprocessor/kube/client_test.go +++ b/processor/k8sattributesprocessor/kube/client_test.go @@ -747,6 +747,166 @@ func TestPodIgnorePatterns(t *testing.T) { } } +func Test_extractPodContainersAttributes(t *testing.T) { + pod := api_v1.Pod{ + Spec: api_v1.PodSpec{ + Containers: []api_v1.Container{ + { + Name: "container1", + Image: "test/image1:0.1.0", + }, + { + Name: "container2", + Image: "test/image2:0.2.0", + }, + }, + InitContainers: []api_v1.Container{ + { + Name: "init_container", + Image: "test/init-image:1.0.2", + }, + }, + }, + Status: api_v1.PodStatus{ + ContainerStatuses: []api_v1.ContainerStatus{ + { + Name: "container1", + ContainerID: "docker://container1-id-123", + RestartCount: 0, + }, + { + Name: "container2", + ContainerID: "docker://container2-id-456", + RestartCount: 2, + }, + }, + InitContainerStatuses: []api_v1.ContainerStatus{ + { + Name: "init_container", + ContainerID: "containerd://init-container-id-123", + RestartCount: 0, + }, + }, + }, + } + tests := []struct { + name string + rules ExtractionRules + pod api_v1.Pod + want map[string]*Container + }{ + { + name: "no-data", + rules: ExtractionRules{ + ContainerImageName: true, + ContainerImageTag: true, + ContainerID: true, + }, + pod: api_v1.Pod{}, + want: map[string]*Container{}, + }, + { + name: "no-rules", + rules: ExtractionRules{}, + pod: pod, + want: map[string]*Container{}, + }, + { + name: "image-name-only", + rules: ExtractionRules{ + ContainerImageName: true, + }, + pod: pod, + want: map[string]*Container{ + "container1": {ImageName: "test/image1"}, + "container2": {ImageName: "test/image2"}, + "init_container": {ImageName: "test/init-image"}, + }, + }, + { + name: "no-image-tag-available", + rules: ExtractionRules{ + ContainerImageName: true, + }, + pod: api_v1.Pod{ + Spec: api_v1.PodSpec{ + Containers: []api_v1.Container{ + { + Name: "test-container", + Image: "test/image", + }, + }, + }, + }, + want: map[string]*Container{ + "test-container": {ImageName: "test/image"}, + }, + }, + { + name: "container-id-only", + rules: ExtractionRules{ + ContainerID: true, + }, + pod: pod, + want: map[string]*Container{ + "container1": { + Statuses: map[int]ContainerStatus{ + 0: {ContainerID: "container1-id-123"}, + }, + }, + "container2": { + Statuses: map[int]ContainerStatus{ + 2: {ContainerID: "container2-id-456"}, + }, + }, + "init_container": { + Statuses: map[int]ContainerStatus{ + 0: {ContainerID: "init-container-id-123"}, + }, + }, + }, + }, + { + name: "all-container-attributes", + rules: ExtractionRules{ + ContainerImageName: true, + ContainerImageTag: true, + ContainerID: true, + }, + pod: pod, + want: map[string]*Container{ + "container1": { + ImageName: "test/image1", + ImageTag: "0.1.0", + Statuses: map[int]ContainerStatus{ + 0: {ContainerID: "container1-id-123"}, + }, + }, + "container2": { + ImageName: "test/image2", + ImageTag: "0.2.0", + Statuses: map[int]ContainerStatus{ + 2: {ContainerID: "container2-id-456"}, + }, + }, + "init_container": { + ImageName: "test/init-image", + ImageTag: "1.0.2", + Statuses: map[int]ContainerStatus{ + 0: {ContainerID: "init-container-id-123"}, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := WatchClient{Rules: tt.rules} + assert.Equal(t, tt.want, c.extractPodContainersAttributes(&tt.pod)) + }) + } +} + func Test_extractField(t *testing.T) { c := WatchClient{} type args struct { diff --git a/processor/k8sattributesprocessor/kube/kube.go b/processor/k8sattributesprocessor/kube/kube.go index dd981746e1ef..80b938177b59 100644 --- a/processor/k8sattributesprocessor/kube/kube.go +++ b/processor/k8sattributesprocessor/kube/kube.go @@ -71,9 +71,26 @@ type Pod struct { Ignore bool Namespace string + // Containers is a map of container name to Container struct. + Containers map[string]*Container + DeletedAt time.Time } +// Container stores resource attributes for a specific container defined by k8s pod spec. +type Container struct { + ImageName string + ImageTag string + + // Statuses is a map of container run_id (restart count) attribute to ContainerStatus struct. + Statuses map[int]ContainerStatus +} + +// ContainerStatus stores resource attributes for a particular container run defined by k8s pod status. +type ContainerStatus struct { + ContainerID string +} + // Namespace represents a kubernetes namespace. type Namespace struct { Name string @@ -118,13 +135,16 @@ type FieldFilter struct { // ExtractionRules is used to specify the information that needs to be extracted // from pods and added to the spans as tags. type ExtractionRules struct { - Deployment bool - Namespace bool - PodName bool - PodUID bool - Node bool - Cluster bool - StartTime bool + Deployment bool + Namespace bool + PodName bool + PodUID bool + Node bool + Cluster bool + StartTime bool + ContainerID bool + ContainerImageName bool + ContainerImageTag bool Annotations []FieldExtractionRule Labels []FieldExtractionRule diff --git a/processor/k8sattributesprocessor/options.go b/processor/k8sattributesprocessor/options.go index 71178b781c4a..296e09d33fd1 100644 --- a/processor/k8sattributesprocessor/options.go +++ b/processor/k8sattributesprocessor/options.go @@ -78,6 +78,9 @@ func WithExtractMetadata(fields ...string) Option { conventions.AttributeK8SDeploymentName, conventions.AttributeK8SClusterName, conventions.AttributeK8SNodeName, + conventions.AttributeContainerID, + conventions.AttributeContainerImageName, + conventions.AttributeContainerImageTag, } } for _, field := range fields { @@ -99,6 +102,12 @@ func WithExtractMetadata(fields ...string) Option { p.rules.Cluster = true case metadataNode, conventions.AttributeK8SNodeName: p.rules.Node = true + case conventions.AttributeContainerID: + p.rules.ContainerID = true + case conventions.AttributeContainerImageName: + p.rules.ContainerImageName = true + case conventions.AttributeContainerImageTag: + p.rules.ContainerImageTag = true default: return fmt.Errorf("\"%s\" is not a supported metadata field", field) } diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index cb60769ad57c..2bd19b8ca591 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -16,6 +16,8 @@ package k8sattributesprocessor import ( "context" + "fmt" + "strconv" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/model/pdata" @@ -29,6 +31,10 @@ import ( const ( k8sIPLabelName string = "k8s.pod.ip" clientIPLabelName string = "ip" + + // TODO: update the label to semantic convention defined in this PR: + // https://github.com/open-telemetry/opentelemetry-specification/pull/1945 + k8sContainerRunIDLabelName string = "run_id" ) type kubernetesprocessor struct { @@ -117,9 +123,11 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pda } if podIdentifierKey != "" { - attrsToAdd := kp.getAttributesForPod(podIdentifierValue) - for key, val := range attrsToAdd { - resource.Attributes().InsertString(key, val) + if pod, ok := kp.kc.GetPod(podIdentifierValue); ok { + for key, val := range pod.Attributes { + resource.Attributes().InsertString(key, val) + } + kp.addContainerAttributes(resource.Attributes(), pod) } } @@ -131,12 +139,35 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pda } } -func (kp *kubernetesprocessor) getAttributesForPod(identifier kube.PodIdentifier) map[string]string { - pod, ok := kp.kc.GetPod(identifier) +// addContainerAttributes looks if pod has any container identifiers and adds additional container attributes +func (kp *kubernetesprocessor) addContainerAttributes(attrs pdata.AttributeMap, pod *kube.Pod) { + containerName := stringAttributeFromMap(attrs, conventions.AttributeK8SContainerName) + if containerName == "" { + return + } + containerSpec, ok := pod.Containers[containerName] if !ok { - return nil + return + } + + if containerSpec.ImageName != "" { + attrs.InsertString(conventions.AttributeContainerImageName, containerSpec.ImageName) + } + if containerSpec.ImageTag != "" { + attrs.InsertString(conventions.AttributeContainerImageTag, containerSpec.ImageTag) + } + + runIDAttr, ok := attrs.Get(k8sContainerRunIDLabelName) + if ok { + runID, err := intFromAttribute(runIDAttr) + if err == nil { + if containerStatus, ok := containerSpec.Statuses[runID]; ok && containerStatus.ContainerID != "" { + attrs.InsertString(conventions.AttributeContainerID, containerStatus.ContainerID) + } + } else { + kp.logger.Debug(err.Error()) + } } - return pod.Attributes } func (kp *kubernetesprocessor) getAttributesForPodsNamespace(namespace string) map[string]string { @@ -146,3 +177,19 @@ func (kp *kubernetesprocessor) getAttributesForPodsNamespace(namespace string) m } return ns.Attributes } + +// intFromAttribute extracts int value from an attribute stored as string or int +func intFromAttribute(val pdata.AttributeValue) (int, error) { + switch val.Type() { + case pdata.AttributeValueTypeInt: + return int(val.IntVal()), nil + case pdata.AttributeValueTypeString: + i, err := strconv.Atoi(val.StringVal()) + if err != nil { + return 0, err + } + return i, nil + default: + return 0, fmt.Errorf("wrong attribute type %v, expected int", val.Type()) + } +} diff --git a/processor/k8sattributesprocessor/processor_test.go b/processor/k8sattributesprocessor/processor_test.go index bac4301122d1..f4c0f031b8ed 100644 --- a/processor/k8sattributesprocessor/processor_test.go +++ b/processor/k8sattributesprocessor/processor_test.go @@ -283,6 +283,18 @@ func withPodUID(uid string) generateResourceFunc { } } +func withContainerName(containerName string) generateResourceFunc { + return func(res pdata.Resource) { + res.Attributes().InsertString(conventions.AttributeK8SContainerName, containerName) + } +} + +func withContainerRunID(containerRunID string) generateResourceFunc { + return func(res pdata.Resource) { + res.Attributes().InsertString(k8sContainerRunIDLabelName, containerRunID) + } +} + func TestIPDetectionFromContext(t *testing.T) { m := newMultiTest(t, NewFactory().CreateDefaultConfig(), nil) @@ -652,6 +664,146 @@ func TestProcessorAddLabels(t *testing.T) { } } +func TestProcessorAddContainerAttributes(t *testing.T) { + tests := []struct { + name string + op func(kp *kubernetesprocessor) + resourceGens []generateResourceFunc + wantAttrs map[string]string + }{ + { + name: "image-only", + op: func(kp *kubernetesprocessor) { + kp.podAssociations = []kube.Association{ + { + From: "resource_attribute", + Name: "k8s.pod.uid", + }, + } + kp.kc.(*fakeClient).Pods[kube.PodIdentifier("19f651bc-73e4-410f-b3e9-f0241679d3b8")] = &kube.Pod{ + Containers: map[string]*kube.Container{ + "app": { + ImageName: "test/app", + ImageTag: "1.0.1", + }, + }, + } + }, + resourceGens: []generateResourceFunc{ + withPodUID("19f651bc-73e4-410f-b3e9-f0241679d3b8"), + withContainerName("app"), + }, + wantAttrs: map[string]string{ + conventions.AttributeK8SPodUID: "19f651bc-73e4-410f-b3e9-f0241679d3b8", + conventions.AttributeK8SContainerName: "app", + conventions.AttributeContainerImageName: "test/app", + conventions.AttributeContainerImageTag: "1.0.1", + }, + }, + { + name: "container-id-only", + op: func(kp *kubernetesprocessor) { + kp.kc.(*fakeClient).Pods[kube.PodIdentifier("1.1.1.1")] = &kube.Pod{ + Containers: map[string]*kube.Container{ + "app": { + Statuses: map[int]kube.ContainerStatus{ + 0: {ContainerID: "fcd58c97330c1dc6615bd520031f6a703a7317cd92adc96013c4dd57daad0b5f"}, + 1: {ContainerID: "6a7f1a598b5dafec9c193f8f8d63f6e5839b8b0acd2fe780f94285e26c05580e"}, + }, + }, + }, + } + }, + resourceGens: []generateResourceFunc{ + withPassthroughIP("1.1.1.1"), + withContainerName("app"), + withContainerRunID("1"), + }, + wantAttrs: map[string]string{ + k8sIPLabelName: "1.1.1.1", + conventions.AttributeK8SContainerName: "app", + k8sContainerRunIDLabelName: "1", + conventions.AttributeContainerID: "6a7f1a598b5dafec9c193f8f8d63f6e5839b8b0acd2fe780f94285e26c05580e", + }, + }, + { + name: "container-name-mismatch", + op: func(kp *kubernetesprocessor) { + kp.kc.(*fakeClient).Pods[kube.PodIdentifier("1.1.1.1")] = &kube.Pod{ + Containers: map[string]*kube.Container{ + "app": { + ImageName: "test/app", + ImageTag: "1.0.1", + Statuses: map[int]kube.ContainerStatus{ + 0: {ContainerID: "fcd58c97330c1dc6615bd520031f6a703a7317cd92adc96013c4dd57daad0b5f"}, + }, + }, + }, + } + }, + resourceGens: []generateResourceFunc{ + withPassthroughIP("1.1.1.1"), + withContainerName("new-app"), + withContainerRunID("0"), + }, + wantAttrs: map[string]string{ + k8sIPLabelName: "1.1.1.1", + conventions.AttributeK8SContainerName: "new-app", + k8sContainerRunIDLabelName: "0", + }, + }, + { + name: "container-run-id-mismatch", + op: func(kp *kubernetesprocessor) { + kp.kc.(*fakeClient).Pods[kube.PodIdentifier("1.1.1.1")] = &kube.Pod{ + Containers: map[string]*kube.Container{ + "app": { + ImageName: "test/app", + Statuses: map[int]kube.ContainerStatus{ + 0: {ContainerID: "fcd58c97330c1dc6615bd520031f6a703a7317cd92adc96013c4dd57daad0b5f"}, + }, + }, + }, + } + }, + resourceGens: []generateResourceFunc{ + withPassthroughIP("1.1.1.1"), + withContainerName("app"), + withContainerRunID("1"), + }, + wantAttrs: map[string]string{ + k8sIPLabelName: "1.1.1.1", + conventions.AttributeK8SContainerName: "app", + k8sContainerRunIDLabelName: "1", + conventions.AttributeContainerImageName: "test/app", + }, + }, + } + + for _, tt := range tests { + m := newMultiTest( + t, + NewFactory().CreateDefaultConfig(), + nil, + ) + m.kubernetesProcessorOperation(tt.op) + m.testConsume(context.Background(), + generateTraces(tt.resourceGens...), + generateMetrics(tt.resourceGens...), + generateLogs(tt.resourceGens...), + nil, + ) + + m.assertBatchesLen(1) + m.assertResource(0, func(r pdata.Resource) { + require.Equal(t, len(tt.wantAttrs), r.Attributes().Len()) + for k, v := range tt.wantAttrs { + assertResourceHasStringAttribute(t, r, k, v) + } + }) + } +} + func TestProcessorPicksUpPassthoughPodIp(t *testing.T) { m := newMultiTest( t, @@ -908,3 +1060,48 @@ func assertResourceHasStringAttribute(t *testing.T, r pdata.Resource, k, v strin assert.EqualValues(t, pdata.AttributeValueTypeString, got.Type(), "attribute %s is not of type string", k) assert.EqualValues(t, v, got.StringVal(), "attribute %s is not equal to %s", k, v) } + +func Test_intFromAttribute(t *testing.T) { + tests := []struct { + name string + attrVal pdata.AttributeValue + wantInt int + wantErr bool + }{ + { + name: "wrong-type", + attrVal: pdata.NewAttributeValueBool(true), + wantInt: 0, + wantErr: true, + }, + { + name: "wrong-string-number", + attrVal: pdata.NewAttributeValueString("NaN"), + wantInt: 0, + wantErr: true, + }, + { + name: "valid-string-number", + attrVal: pdata.NewAttributeValueString("3"), + wantInt: 3, + wantErr: false, + }, + { + name: "valid-int-number", + attrVal: pdata.NewAttributeValueInt(1), + wantInt: 1, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := intFromAttribute(tt.attrVal) + assert.Equal(t, tt.wantInt, got) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +}