diff --git a/processor/k8sattributesprocessor/doc.go b/processor/k8sattributesprocessor/doc.go index 558a6abcbdb4..af78455787f4 100644 --- a/processor/k8sattributesprocessor/doc.go +++ b/processor/k8sattributesprocessor/doc.go @@ -40,7 +40,29 @@ // // If Pod association rules are not configured resources are associated with metadata only by connection's IP Address. // -// +// Which metadata to collect is determined by `metadata` configuration that defines list of resource attributes +// to be added. Items in the list called exactly the same as the resource attributes that will be added. +// All the available attributes are enabled by default, you can reduce the list with `metadata` configuration. +// The following attributes will be added if pod identified: +// - k8s.namespace.name +// - k8s.pod.name +// - k8s.pod.uid +// - k8s.pod.start_time +// - k8s.deployment.name +// - k8s.cluster.name +// - k8s.node.name +// Not all the attributes are guaranteed to be added. For example `k8s.cluster.name` usually is not provided by k8s API, +// so likely it won't be set as an attribute. + +// The following container level attributes require additional attributes to identify a particular container in a pod: +// 1. Container spec attributes - will be set only if container identifying attribute `container.name` is set +// as a resource attribute (similar to all other attributes, pod has to be identified as well): +// - container.image.name +// - container.image.tag +// 2. Container status attributes - in addition to pod identifier and `container.name` attribute, these attributes +// require identifier of a particular container run set as `run_id` in resource attributes: +// - container.id + //The k8sattributesprocessor can be used for automatic tagging of spans, metrics and logs with k8s labels and annotations from pods and namespaces. //The config for associating the data passing through the processor (spans, metrics and logs) with specific Pod/Namespace annotations/labels is configured via "annotations" and "labels" keys. //This config represents a list of annotations/labels that are extracted from pods/namespaces and added to spans, metrics and logs. diff --git a/processor/k8sattributesprocessor/kube/client.go b/processor/k8sattributesprocessor/kube/client.go index d623525d3b7a..5f2801f61cfb 100644 --- a/processor/k8sattributesprocessor/kube/client.go +++ b/processor/k8sattributesprocessor/kube/client.go @@ -330,6 +330,48 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string { return tags } +func (c *WatchClient) extractPodContainersAttributes(pod *api_v1.Pod) map[string]*Container { + containers := map[string]*Container{} + + if c.Rules.ContainerImageName || c.Rules.ContainerImageTag { + for _, spec := range append(pod.Spec.Containers, pod.Spec.InitContainers...) { + container := &Container{} + imageParts := strings.Split(spec.Image, ":") + if c.Rules.ContainerImageName { + container.ImageName = imageParts[0] + } + if c.Rules.ContainerImageTag && len(imageParts) > 1 { + container.ImageTag = imageParts[1] + } + containers[spec.Name] = container + } + } + + if c.Rules.ContainerID { + for _, apiStatus := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { + container, ok := containers[apiStatus.Name] + if !ok { + container = &Container{} + containers[apiStatus.Name] = container + } + if container.Statuses == nil { + container.Statuses = map[int]ContainerStatus{} + } + + containerID := apiStatus.ContainerID + + // Remove container runtime prefix + idParts := strings.Split(containerID, "://") + if len(idParts) == 2 { + containerID = idParts[1] + } + + container.Statuses[int(apiStatus.RestartCount)] = ContainerStatus{containerID} + } + } + return containers +} + func (c *WatchClient) extractNamespaceAttributes(namespace *api_v1.Namespace) map[string]string { tags := map[string]string{} @@ -378,6 +420,9 @@ func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) { newPod.Ignore = true } else { newPod.Attributes = c.extractPodAttributes(pod) + if needContainerAttributes(c.Rules) { + newPod.Containers = c.extractPodContainersAttributes(pod) + } } c.m.Lock() @@ -513,3 +558,7 @@ func (c *WatchClient) extractNamespaceLabelsAnnotations() bool { return false } + +func needContainerAttributes(rules ExtractionRules) bool { + return rules.ContainerImageName || rules.ContainerImageTag || rules.ContainerID +} diff --git a/processor/k8sattributesprocessor/kube/client_test.go b/processor/k8sattributesprocessor/kube/client_test.go index c8e1ebe5f654..f57686b5bf4e 100644 --- a/processor/k8sattributesprocessor/kube/client_test.go +++ b/processor/k8sattributesprocessor/kube/client_test.go @@ -747,6 +747,166 @@ func TestPodIgnorePatterns(t *testing.T) { } } +func Test_extractPodContainersAttributes(t *testing.T) { + pod := api_v1.Pod{ + Spec: api_v1.PodSpec{ + Containers: []api_v1.Container{ + { + Name: "container1", + Image: "test/image1:0.1.0", + }, + { + Name: "container2", + Image: "test/image2:0.2.0", + }, + }, + InitContainers: []api_v1.Container{ + { + Name: "init_container", + Image: "test/init-image:1.0.2", + }, + }, + }, + Status: api_v1.PodStatus{ + ContainerStatuses: []api_v1.ContainerStatus{ + { + Name: "container1", + ContainerID: "docker://container1-id-123", + RestartCount: 0, + }, + { + Name: "container2", + ContainerID: "docker://container2-id-456", + RestartCount: 2, + }, + }, + InitContainerStatuses: []api_v1.ContainerStatus{ + { + Name: "init_container", + ContainerID: "containerd://init-container-id-123", + RestartCount: 0, + }, + }, + }, + } + tests := []struct { + name string + rules ExtractionRules + pod api_v1.Pod + want map[string]*Container + }{ + { + name: "no-data", + rules: ExtractionRules{ + ContainerImageName: true, + ContainerImageTag: true, + ContainerID: true, + }, + pod: api_v1.Pod{}, + want: map[string]*Container{}, + }, + { + name: "no-rules", + rules: ExtractionRules{}, + pod: pod, + want: map[string]*Container{}, + }, + { + name: "image-name-only", + rules: ExtractionRules{ + ContainerImageName: true, + }, + pod: pod, + want: map[string]*Container{ + "container1": {ImageName: "test/image1"}, + "container2": {ImageName: "test/image2"}, + "init_container": {ImageName: "test/init-image"}, + }, + }, + { + name: "no-image-tag-available", + rules: ExtractionRules{ + ContainerImageName: true, + }, + pod: api_v1.Pod{ + Spec: api_v1.PodSpec{ + Containers: []api_v1.Container{ + { + Name: "test-container", + Image: "test/image", + }, + }, + }, + }, + want: map[string]*Container{ + "test-container": {ImageName: "test/image"}, + }, + }, + { + name: "container-id-only", + rules: ExtractionRules{ + ContainerID: true, + }, + pod: pod, + want: map[string]*Container{ + "container1": { + Statuses: map[int]ContainerStatus{ + 0: {ContainerID: "container1-id-123"}, + }, + }, + "container2": { + Statuses: map[int]ContainerStatus{ + 2: {ContainerID: "container2-id-456"}, + }, + }, + "init_container": { + Statuses: map[int]ContainerStatus{ + 0: {ContainerID: "init-container-id-123"}, + }, + }, + }, + }, + { + name: "all-container-attributes", + rules: ExtractionRules{ + ContainerImageName: true, + ContainerImageTag: true, + ContainerID: true, + }, + pod: pod, + want: map[string]*Container{ + "container1": { + ImageName: "test/image1", + ImageTag: "0.1.0", + Statuses: map[int]ContainerStatus{ + 0: {ContainerID: "container1-id-123"}, + }, + }, + "container2": { + ImageName: "test/image2", + ImageTag: "0.2.0", + Statuses: map[int]ContainerStatus{ + 2: {ContainerID: "container2-id-456"}, + }, + }, + "init_container": { + ImageName: "test/init-image", + ImageTag: "1.0.2", + Statuses: map[int]ContainerStatus{ + 0: {ContainerID: "init-container-id-123"}, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := WatchClient{Rules: tt.rules} + assert.Equal(t, tt.want, c.extractPodContainersAttributes(&tt.pod)) + }) + } +} + func Test_extractField(t *testing.T) { c := WatchClient{} type args struct { diff --git a/processor/k8sattributesprocessor/kube/kube.go b/processor/k8sattributesprocessor/kube/kube.go index dd981746e1ef..80b938177b59 100644 --- a/processor/k8sattributesprocessor/kube/kube.go +++ b/processor/k8sattributesprocessor/kube/kube.go @@ -71,9 +71,26 @@ type Pod struct { Ignore bool Namespace string + // Containers is a map of container name to Container struct. + Containers map[string]*Container + DeletedAt time.Time } +// Container stores resource attributes for a specific container defined by k8s pod spec. +type Container struct { + ImageName string + ImageTag string + + // Statuses is a map of container run_id (restart count) attribute to ContainerStatus struct. + Statuses map[int]ContainerStatus +} + +// ContainerStatus stores resource attributes for a particular container run defined by k8s pod status. +type ContainerStatus struct { + ContainerID string +} + // Namespace represents a kubernetes namespace. type Namespace struct { Name string @@ -118,13 +135,16 @@ type FieldFilter struct { // ExtractionRules is used to specify the information that needs to be extracted // from pods and added to the spans as tags. type ExtractionRules struct { - Deployment bool - Namespace bool - PodName bool - PodUID bool - Node bool - Cluster bool - StartTime bool + Deployment bool + Namespace bool + PodName bool + PodUID bool + Node bool + Cluster bool + StartTime bool + ContainerID bool + ContainerImageName bool + ContainerImageTag bool Annotations []FieldExtractionRule Labels []FieldExtractionRule diff --git a/processor/k8sattributesprocessor/options.go b/processor/k8sattributesprocessor/options.go index 71178b781c4a..296e09d33fd1 100644 --- a/processor/k8sattributesprocessor/options.go +++ b/processor/k8sattributesprocessor/options.go @@ -78,6 +78,9 @@ func WithExtractMetadata(fields ...string) Option { conventions.AttributeK8SDeploymentName, conventions.AttributeK8SClusterName, conventions.AttributeK8SNodeName, + conventions.AttributeContainerID, + conventions.AttributeContainerImageName, + conventions.AttributeContainerImageTag, } } for _, field := range fields { @@ -99,6 +102,12 @@ func WithExtractMetadata(fields ...string) Option { p.rules.Cluster = true case metadataNode, conventions.AttributeK8SNodeName: p.rules.Node = true + case conventions.AttributeContainerID: + p.rules.ContainerID = true + case conventions.AttributeContainerImageName: + p.rules.ContainerImageName = true + case conventions.AttributeContainerImageTag: + p.rules.ContainerImageTag = true default: return fmt.Errorf("\"%s\" is not a supported metadata field", field) } diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index cb60769ad57c..2bd19b8ca591 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -16,6 +16,8 @@ package k8sattributesprocessor import ( "context" + "fmt" + "strconv" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/model/pdata" @@ -29,6 +31,10 @@ import ( const ( k8sIPLabelName string = "k8s.pod.ip" clientIPLabelName string = "ip" + + // TODO: update the label to semantic convention defined in this PR: + // https://github.com/open-telemetry/opentelemetry-specification/pull/1945 + k8sContainerRunIDLabelName string = "run_id" ) type kubernetesprocessor struct { @@ -117,9 +123,11 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pda } if podIdentifierKey != "" { - attrsToAdd := kp.getAttributesForPod(podIdentifierValue) - for key, val := range attrsToAdd { - resource.Attributes().InsertString(key, val) + if pod, ok := kp.kc.GetPod(podIdentifierValue); ok { + for key, val := range pod.Attributes { + resource.Attributes().InsertString(key, val) + } + kp.addContainerAttributes(resource.Attributes(), pod) } } @@ -131,12 +139,35 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pda } } -func (kp *kubernetesprocessor) getAttributesForPod(identifier kube.PodIdentifier) map[string]string { - pod, ok := kp.kc.GetPod(identifier) +// addContainerAttributes looks if pod has any container identifiers and adds additional container attributes +func (kp *kubernetesprocessor) addContainerAttributes(attrs pdata.AttributeMap, pod *kube.Pod) { + containerName := stringAttributeFromMap(attrs, conventions.AttributeK8SContainerName) + if containerName == "" { + return + } + containerSpec, ok := pod.Containers[containerName] if !ok { - return nil + return + } + + if containerSpec.ImageName != "" { + attrs.InsertString(conventions.AttributeContainerImageName, containerSpec.ImageName) + } + if containerSpec.ImageTag != "" { + attrs.InsertString(conventions.AttributeContainerImageTag, containerSpec.ImageTag) + } + + runIDAttr, ok := attrs.Get(k8sContainerRunIDLabelName) + if ok { + runID, err := intFromAttribute(runIDAttr) + if err == nil { + if containerStatus, ok := containerSpec.Statuses[runID]; ok && containerStatus.ContainerID != "" { + attrs.InsertString(conventions.AttributeContainerID, containerStatus.ContainerID) + } + } else { + kp.logger.Debug(err.Error()) + } } - return pod.Attributes } func (kp *kubernetesprocessor) getAttributesForPodsNamespace(namespace string) map[string]string { @@ -146,3 +177,19 @@ func (kp *kubernetesprocessor) getAttributesForPodsNamespace(namespace string) m } return ns.Attributes } + +// intFromAttribute extracts int value from an attribute stored as string or int +func intFromAttribute(val pdata.AttributeValue) (int, error) { + switch val.Type() { + case pdata.AttributeValueTypeInt: + return int(val.IntVal()), nil + case pdata.AttributeValueTypeString: + i, err := strconv.Atoi(val.StringVal()) + if err != nil { + return 0, err + } + return i, nil + default: + return 0, fmt.Errorf("wrong attribute type %v, expected int", val.Type()) + } +} diff --git a/processor/k8sattributesprocessor/processor_test.go b/processor/k8sattributesprocessor/processor_test.go index bac4301122d1..f4c0f031b8ed 100644 --- a/processor/k8sattributesprocessor/processor_test.go +++ b/processor/k8sattributesprocessor/processor_test.go @@ -283,6 +283,18 @@ func withPodUID(uid string) generateResourceFunc { } } +func withContainerName(containerName string) generateResourceFunc { + return func(res pdata.Resource) { + res.Attributes().InsertString(conventions.AttributeK8SContainerName, containerName) + } +} + +func withContainerRunID(containerRunID string) generateResourceFunc { + return func(res pdata.Resource) { + res.Attributes().InsertString(k8sContainerRunIDLabelName, containerRunID) + } +} + func TestIPDetectionFromContext(t *testing.T) { m := newMultiTest(t, NewFactory().CreateDefaultConfig(), nil) @@ -652,6 +664,146 @@ func TestProcessorAddLabels(t *testing.T) { } } +func TestProcessorAddContainerAttributes(t *testing.T) { + tests := []struct { + name string + op func(kp *kubernetesprocessor) + resourceGens []generateResourceFunc + wantAttrs map[string]string + }{ + { + name: "image-only", + op: func(kp *kubernetesprocessor) { + kp.podAssociations = []kube.Association{ + { + From: "resource_attribute", + Name: "k8s.pod.uid", + }, + } + kp.kc.(*fakeClient).Pods[kube.PodIdentifier("19f651bc-73e4-410f-b3e9-f0241679d3b8")] = &kube.Pod{ + Containers: map[string]*kube.Container{ + "app": { + ImageName: "test/app", + ImageTag: "1.0.1", + }, + }, + } + }, + resourceGens: []generateResourceFunc{ + withPodUID("19f651bc-73e4-410f-b3e9-f0241679d3b8"), + withContainerName("app"), + }, + wantAttrs: map[string]string{ + conventions.AttributeK8SPodUID: "19f651bc-73e4-410f-b3e9-f0241679d3b8", + conventions.AttributeK8SContainerName: "app", + conventions.AttributeContainerImageName: "test/app", + conventions.AttributeContainerImageTag: "1.0.1", + }, + }, + { + name: "container-id-only", + op: func(kp *kubernetesprocessor) { + kp.kc.(*fakeClient).Pods[kube.PodIdentifier("1.1.1.1")] = &kube.Pod{ + Containers: map[string]*kube.Container{ + "app": { + Statuses: map[int]kube.ContainerStatus{ + 0: {ContainerID: "fcd58c97330c1dc6615bd520031f6a703a7317cd92adc96013c4dd57daad0b5f"}, + 1: {ContainerID: "6a7f1a598b5dafec9c193f8f8d63f6e5839b8b0acd2fe780f94285e26c05580e"}, + }, + }, + }, + } + }, + resourceGens: []generateResourceFunc{ + withPassthroughIP("1.1.1.1"), + withContainerName("app"), + withContainerRunID("1"), + }, + wantAttrs: map[string]string{ + k8sIPLabelName: "1.1.1.1", + conventions.AttributeK8SContainerName: "app", + k8sContainerRunIDLabelName: "1", + conventions.AttributeContainerID: "6a7f1a598b5dafec9c193f8f8d63f6e5839b8b0acd2fe780f94285e26c05580e", + }, + }, + { + name: "container-name-mismatch", + op: func(kp *kubernetesprocessor) { + kp.kc.(*fakeClient).Pods[kube.PodIdentifier("1.1.1.1")] = &kube.Pod{ + Containers: map[string]*kube.Container{ + "app": { + ImageName: "test/app", + ImageTag: "1.0.1", + Statuses: map[int]kube.ContainerStatus{ + 0: {ContainerID: "fcd58c97330c1dc6615bd520031f6a703a7317cd92adc96013c4dd57daad0b5f"}, + }, + }, + }, + } + }, + resourceGens: []generateResourceFunc{ + withPassthroughIP("1.1.1.1"), + withContainerName("new-app"), + withContainerRunID("0"), + }, + wantAttrs: map[string]string{ + k8sIPLabelName: "1.1.1.1", + conventions.AttributeK8SContainerName: "new-app", + k8sContainerRunIDLabelName: "0", + }, + }, + { + name: "container-run-id-mismatch", + op: func(kp *kubernetesprocessor) { + kp.kc.(*fakeClient).Pods[kube.PodIdentifier("1.1.1.1")] = &kube.Pod{ + Containers: map[string]*kube.Container{ + "app": { + ImageName: "test/app", + Statuses: map[int]kube.ContainerStatus{ + 0: {ContainerID: "fcd58c97330c1dc6615bd520031f6a703a7317cd92adc96013c4dd57daad0b5f"}, + }, + }, + }, + } + }, + resourceGens: []generateResourceFunc{ + withPassthroughIP("1.1.1.1"), + withContainerName("app"), + withContainerRunID("1"), + }, + wantAttrs: map[string]string{ + k8sIPLabelName: "1.1.1.1", + conventions.AttributeK8SContainerName: "app", + k8sContainerRunIDLabelName: "1", + conventions.AttributeContainerImageName: "test/app", + }, + }, + } + + for _, tt := range tests { + m := newMultiTest( + t, + NewFactory().CreateDefaultConfig(), + nil, + ) + m.kubernetesProcessorOperation(tt.op) + m.testConsume(context.Background(), + generateTraces(tt.resourceGens...), + generateMetrics(tt.resourceGens...), + generateLogs(tt.resourceGens...), + nil, + ) + + m.assertBatchesLen(1) + m.assertResource(0, func(r pdata.Resource) { + require.Equal(t, len(tt.wantAttrs), r.Attributes().Len()) + for k, v := range tt.wantAttrs { + assertResourceHasStringAttribute(t, r, k, v) + } + }) + } +} + func TestProcessorPicksUpPassthoughPodIp(t *testing.T) { m := newMultiTest( t, @@ -908,3 +1060,48 @@ func assertResourceHasStringAttribute(t *testing.T, r pdata.Resource, k, v strin assert.EqualValues(t, pdata.AttributeValueTypeString, got.Type(), "attribute %s is not of type string", k) assert.EqualValues(t, v, got.StringVal(), "attribute %s is not equal to %s", k, v) } + +func Test_intFromAttribute(t *testing.T) { + tests := []struct { + name string + attrVal pdata.AttributeValue + wantInt int + wantErr bool + }{ + { + name: "wrong-type", + attrVal: pdata.NewAttributeValueBool(true), + wantInt: 0, + wantErr: true, + }, + { + name: "wrong-string-number", + attrVal: pdata.NewAttributeValueString("NaN"), + wantInt: 0, + wantErr: true, + }, + { + name: "valid-string-number", + attrVal: pdata.NewAttributeValueString("3"), + wantInt: 3, + wantErr: false, + }, + { + name: "valid-int-number", + attrVal: pdata.NewAttributeValueInt(1), + wantInt: 1, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := intFromAttribute(tt.attrVal) + assert.Equal(t, tt.wantInt, got) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +}