Skip to content

Commit

Permalink
[processor/k8sattribute] Support adding labels and annotations from n…
Browse files Browse the repository at this point in the history
…ode (#28570)

**Description:**
support adding labels and annotations from the node as additional
resource attributes on telemetry processed through the `k8sattributes`
processor.

**Link to tracking Issue:** Resolve  #22620

---------

Co-authored-by: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com>
  • Loading branch information
haoqixu and TylerHelmuth authored Oct 31, 2023
1 parent c50daa9 commit 49438fe
Show file tree
Hide file tree
Showing 17 changed files with 536 additions and 18 deletions.
27 changes: 27 additions & 0 deletions .chloggen/feat-22620.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/k8sattribute

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: support adding labels and annotations from node

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [22620]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
5 changes: 5 additions & 0 deletions .github/workflows/configs/e2e-kind-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@ kubeadmConfigPatches:
- |
kind: KubeletConfiguration
serverTLSBootstrap: true
nodes:
- role: control-plane
labels:
# used in k8sattributesprocessor e2e test
foo: too
17 changes: 12 additions & 5 deletions processor/k8sattributesprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ Additional container level attributes can be extracted provided that certain res
instance. If it's not set, the latest container instance will be used:
- container.id (not added by default, has to be specified in `metadata`)

The k8sattributesprocessor can also set resource attributes from k8s labels and annotations of pods and namespaces.
The config for associating the data passing through the processor (spans, metrics and logs) with specific Pod/Namespace annotations/labels is configured via "annotations" and "labels" keys.
This config represents a list of annotations/labels that are extracted from pods/namespaces and added to spans, metrics and logs.
The k8sattributesprocessor can also set resource attributes from k8s labels and annotations of pods, namespaces and nodes.
The config for associating the data passing through the processor (spans, metrics and logs) with specific Pod/Namespace/Node annotations/labels is configured via "annotations" and "labels" keys.
This config represents a list of annotations/labels that are extracted from pods/namespaces/nodes and added to spans, metrics and logs.
Each item is specified as a config of tag_name (representing the tag name to tag the spans with),
key (representing the key used to extract value) and from (representing the kubernetes object used to extract the value).
The "from" field has only two possible values "pod" and "namespace" and defaults to "pod" if none is specified.
The "from" field has only three possible values "pod", "namespace" and "node" and defaults to "pod" if none is specified.

A few examples to use this config are as follows:

Expand All @@ -106,6 +106,10 @@ annotations:
key: annotation-two
regex: field=(?P<value>.+)
from: namespace
- tag_name: a3 # extracts value of annotation from nodes with key `annotation-three` with regexp and inserts it as a tag with key `a3`
key: annotation-three
regex: field=(?P<value>.+)
from: node

labels:
- tag_name: l1 # extracts value of label from namespaces with key `label1` and inserts it as a tag with key `l1`
Expand All @@ -115,6 +119,9 @@ labels:
key: label2
regex: field=(?P<value>.+)
from: pod
- tag_name: l3 # extracts value of label from nodes with key `label3` and inserts it as a tag with key `l3`
key: label3
from: node
```
### Config example
Expand Down Expand Up @@ -147,7 +154,7 @@ k8sattributes/2:
## Role-based access control
The k8sattributesprocessor needs `get`, `watch` and `list` permissions on both `pods` and `namespaces` resources, for all namespaces and pods included in the configured filters. Additionally, when using `k8s.deployment.uid` or `k8s.deployment.name` the processor also needs `get`, `watch` and `list` permissions for `replicaset` resources.
The k8sattributesprocessor needs `get`, `watch` and `list` permissions on both `pods` and `namespaces` resources, for all namespaces and pods included in the configured filters. Additionally, when using `k8s.deployment.uid` or `k8s.deployment.name` the processor also needs `get`, `watch` and `list` permissions for `replicaset` resources. When extracting metadatas from `node`, the processor needs `get`, `watch` and `list` permissions for `node` resources.

Here is an example of a `ClusterRole` to give a `ServiceAccount` the necessary permissions for all pods and namespaces in the cluster (replace `<OTEL_COL_NAMESPACE>` with a namespace where collector is deployed):

Expand Down
8 changes: 8 additions & 0 deletions processor/k8sattributesprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ type fakeClient struct {
Informer cache.SharedInformer
NamespaceInformer cache.SharedInformer
ReplicaSetInformer cache.SharedInformer
NodeInformer cache.SharedInformer
Namespaces map[string]*kube.Namespace
Nodes map[string]*kube.Node
StopCh chan struct{}
}

Expand All @@ -44,6 +46,7 @@ func newFakeClient(_ *zap.Logger, _ k8sconfig.APIConfig, rules kube.ExtractionRu
Associations: associations,
Informer: kube.NewFakeInformer(cs, "", ls, fs),
NamespaceInformer: kube.NewFakeInformer(cs, "", ls, fs),
NodeInformer: kube.NewFakeInformer(cs, "", ls, fs),
ReplicaSetInformer: kube.NewFakeInformer(cs, "", ls, fs),
StopCh: make(chan struct{}),
}, nil
Expand All @@ -61,6 +64,11 @@ func (f *fakeClient) GetNamespace(namespace string) (*kube.Namespace, bool) {
return ns, ok
}

func (f *fakeClient) GetNode(nodeName string) (*kube.Node, bool) {
node, ok := f.Nodes[nodeName]
return node, ok
}

// Start is a noop for FakeClient.
func (f *fakeClient) Start() {
if f.Informer != nil {
Expand Down
6 changes: 3 additions & 3 deletions processor/k8sattributesprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ func (cfg *Config) Validate() error {
}

switch f.From {
case "", kube.MetadataFromPod, kube.MetadataFromNamespace:
case "", kube.MetadataFromPod, kube.MetadataFromNamespace, kube.MetadataFromNode:
default:
return fmt.Errorf("%s is not a valid choice for From. Must be one of: pod, namespace", f.From)
return fmt.Errorf("%s is not a valid choice for From. Must be one of: pod, namespace, node", f.From)
}

if f.Regex != "" {
Expand Down Expand Up @@ -117,7 +117,7 @@ func (cfg *Config) Validate() error {
// ExtractConfig section allows specifying extraction rules to extract
// data from k8s pod specs.
type ExtractConfig struct {
// Metadata allows to extract pod/namespace metadata from a list of metadata fields.
// Metadata allows to extract pod/namespace/node metadata from a list of metadata fields.
// The field accepts a list of strings.
//
// Metadata fields supported right now are,
Expand Down
9 changes: 9 additions & 0 deletions processor/k8sattributesprocessor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func TestE2E(t *testing.T) {
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
"k8s.node.labels.foo": newExpectedValue(equal, "too"),
},
},
{
Expand All @@ -129,6 +130,7 @@ func TestE2E(t *testing.T) {
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
"k8s.node.labels.foo": newExpectedValue(equal, "too"),
},
},
{
Expand Down Expand Up @@ -175,6 +177,7 @@ func TestE2E(t *testing.T) {
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
"k8s.node.labels.foo": newExpectedValue(equal, "too"),
},
},
{
Expand All @@ -197,6 +200,7 @@ func TestE2E(t *testing.T) {
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
"k8s.node.labels.foo": newExpectedValue(equal, "too"),
},
},
{
Expand All @@ -219,6 +223,7 @@ func TestE2E(t *testing.T) {
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
"k8s.node.labels.foo": newExpectedValue(equal, "too"),
},
},
{
Expand Down Expand Up @@ -265,6 +270,7 @@ func TestE2E(t *testing.T) {
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
"k8s.node.labels.foo": newExpectedValue(equal, "too"),
},
},
{
Expand All @@ -287,6 +293,7 @@ func TestE2E(t *testing.T) {
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
"k8s.node.labels.foo": newExpectedValue(equal, "too"),
},
},
{
Expand Down Expand Up @@ -333,6 +340,7 @@ func TestE2E(t *testing.T) {
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
"k8s.node.labels.foo": newExpectedValue(equal, "too"),
},
},
{
Expand All @@ -355,6 +363,7 @@ func TestE2E(t *testing.T) {
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
"k8s.node.labels.foo": newExpectedValue(equal, "too"),
},
},
}
Expand Down
109 changes: 109 additions & 0 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type WatchClient struct {
kc kubernetes.Interface
informer cache.SharedInformer
namespaceInformer cache.SharedInformer
nodeInformer cache.SharedInformer
replicasetInformer cache.SharedInformer
replicasetRegex *regexp.Regexp
cronJobRegex *regexp.Regexp
Expand All @@ -60,6 +61,10 @@ type WatchClient struct {
// Key is namespace name
Namespaces map[string]*Namespace

// A map containing Node related data, used to associate them with resources.
// Key is node name
Nodes map[string]*Node

// A map containing ReplicaSets related data, used to associate them with resources.
// Key is replicaset uid
ReplicaSets map[string]*ReplicaSet
Expand Down Expand Up @@ -89,6 +94,7 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,

c.Pods = map[PodIdentifier]*Pod{}
c.Namespaces = map[string]*Namespace{}
c.Nodes = map[string]*Node{}
c.ReplicaSets = map[string]*ReplicaSet{}
if newClientSet == nil {
newClientSet = k8sconfig.MakeClient
Expand Down Expand Up @@ -162,6 +168,10 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,
}
}

if c.extractNodeLabelsAnnotations() {
c.nodeInformer = newNodeSharedInformer(c.kc, c.Filters.Node)
}

return c, err
}

Expand Down Expand Up @@ -198,6 +208,18 @@ func (c *WatchClient) Start() {
}
go c.replicasetInformer.Run(c.stopCh)
}

if c.nodeInformer != nil {
_, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleNodeAdd,
UpdateFunc: c.handleNodeUpdate,
DeleteFunc: c.handleNodeDelete,
})
if err != nil {
c.logger.Error("error adding event handler to node informer", zap.Error(err))
}
go c.nodeInformer.Run(c.stopCh)
}
}

// Stop signals the the k8s watcher/informer to stop watching for new events.
Expand Down Expand Up @@ -273,6 +295,37 @@ func (c *WatchClient) handleNamespaceDelete(obj interface{}) {
}
}

func (c *WatchClient) handleNodeAdd(obj interface{}) {
observability.RecordNodeAdded()
if node, ok := obj.(*api_v1.Node); ok {
c.addOrUpdateNode(node)
} else {
c.logger.Error("object received was not of type api_v1.Node", zap.Any("received", obj))
}
}

func (c *WatchClient) handleNodeUpdate(_, newNode interface{}) {
observability.RecordNodeUpdated()
if node, ok := newNode.(*api_v1.Node); ok {
c.addOrUpdateNode(node)
} else {
c.logger.Error("object received was not of type api_v1.Node", zap.Any("received", newNode))
}
}

func (c *WatchClient) handleNodeDelete(obj interface{}) {
observability.RecordNodeDeleted()
if node, ok := ignoreDeletedFinalStateUnknown(obj).(*api_v1.Node); ok {
c.m.Lock()
if n, ok := c.Nodes[node.Name]; ok {
delete(c.Nodes, n.Name)
}
c.m.Unlock()
} else {
c.logger.Error("object received was not of type api_v1.Node", zap.Any("received", obj))
}
}

func (c *WatchClient) deleteLoop(interval time.Duration, gracePeriod time.Duration) {
// This loop runs after N seconds and deletes pods from cache.
// It iterates over the delete queue and deletes all that aren't
Expand Down Expand Up @@ -339,6 +392,17 @@ func (c *WatchClient) GetNamespace(namespace string) (*Namespace, bool) {
return nil, false
}

// GetNode takes a node name and returns the node object the node name is associated with.
func (c *WatchClient) GetNode(nodeName string) (*Node, bool) {
c.m.RLock()
node, ok := c.Nodes[nodeName]
c.m.RUnlock()
if ok {
return node, ok
}
return nil, false
}

func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
tags := map[string]string{}
if c.Rules.PodName {
Expand Down Expand Up @@ -614,10 +678,25 @@ func (c *WatchClient) extractNamespaceAttributes(namespace *api_v1.Namespace) ma
return tags
}

func (c *WatchClient) extractNodeAttributes(node *api_v1.Node) map[string]string {
tags := map[string]string{}

for _, r := range c.Rules.Labels {
r.extractFromNodeMetadata(node.Labels, tags, "k8s.node.labels.%s")
}

for _, r := range c.Rules.Annotations {
r.extractFromNodeMetadata(node.Annotations, tags, "k8s.node.annotations.%s")
}

return tags
}

func (c *WatchClient) podFromAPI(pod *api_v1.Pod) *Pod {
newPod := &Pod{
Name: pod.Name,
Namespace: pod.GetNamespace(),
NodeName: pod.Spec.NodeName,
Address: pod.Status.PodIP,
HostNetwork: pod.Spec.HostNetwork,
PodUID: string(pod.UID),
Expand Down Expand Up @@ -832,6 +911,36 @@ func (c *WatchClient) extractNamespaceLabelsAnnotations() bool {
return false
}

func (c *WatchClient) extractNodeLabelsAnnotations() bool {
for _, r := range c.Rules.Labels {
if r.From == MetadataFromNode {
return true
}
}

for _, r := range c.Rules.Annotations {
if r.From == MetadataFromNode {
return true
}
}

return false
}

func (c *WatchClient) addOrUpdateNode(node *api_v1.Node) {
newNode := &Node{
Name: node.Name,
NodeUID: string(node.UID),
}
newNode.Attributes = c.extractNodeAttributes(node)

c.m.Lock()
if node.Name != "" {
c.Nodes[node.Name] = newNode
}
c.m.Unlock()
}

func needContainerAttributes(rules ExtractionRules) bool {
return rules.ContainerImageName ||
rules.ContainerName ||
Expand Down
Loading

0 comments on commit 49438fe

Please sign in to comment.