From 80ba1dda548378894ad150a47d95c4c358d40e96 Mon Sep 17 00:00:00 2001 From: chrismark Date: Mon, 18 Apr 2022 13:09:16 +0100 Subject: [PATCH 1/4] Add orchestrator ECS fields in k8s events Signed-off-by: chrismark --- metricbeat/module/kubernetes/event/event.go | 44 +++++++++++++++++++-- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/metricbeat/module/kubernetes/event/event.go b/metricbeat/module/kubernetes/event/event.go index c0ac28fb0c51..66150229dbd1 100644 --- a/metricbeat/module/kubernetes/event/event.go +++ b/metricbeat/module/kubernetes/event/event.go @@ -19,10 +19,12 @@ package event import ( "fmt" + k8sclient "k8s.io/client-go/kubernetes" "time" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/kubernetes" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" "github.com/elastic/beats/v7/libbeat/common/safemapstr" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/mb" @@ -44,6 +46,7 @@ type MetricSet struct { watchOptions kubernetes.WatchOptions dedotConfig dedotConfig skipOlder bool + clusterMeta common.MapStr } // dedotConfig defines LabelsDedot and AnnotationsDedot. @@ -85,13 +88,38 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { AnnotationsDedot: config.AnnotationsDedot, } - return &MetricSet{ + + ms := &MetricSet{ BaseMetricSet: base, dedotConfig: dedotConfig, watcher: watcher, watchOptions: watchOptions, skipOlder: config.SkipOlder, - }, nil + } + + // add ECS orchestrator fields + cfg, _ := common.NewConfigFrom(&config) + ecsClusterMeta, err := getClusterECSMeta(cfg, client) + if ecsClusterMeta != nil { + ms.clusterMeta = ecsClusterMeta + } + + return ms, nil +} + +func getClusterECSMeta(cfg *common.Config, client k8sclient.Interface) (common.MapStr, error) { + clusterInfo, err := metadata.GetKubernetesClusterIdentifier(cfg, client) + if err != nil { + return nil, fmt.Errorf("fail to init kubernetes watcher: %w", err) + } + ecsClusterMeta := common.MapStr{} + if clusterInfo.Url != "" { + ecsClusterMeta.Put("orchestrator.cluster.url", clusterInfo.Url) + } + if clusterInfo.Name != "" { + ecsClusterMeta.Put("orchestrator.cluster.name", clusterInfo.Name) + } + return ecsClusterMeta, nil } // Run method provides the Kubernetes event watcher with a reporter with which events can be reported. @@ -100,11 +128,19 @@ func (m *MetricSet) Run(reporter mb.PushReporterV2) { handler := kubernetes.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { mapStrEvent := generateMapStrFromEvent(obj.(*kubernetes.Event), m.dedotConfig, m.Logger()) - reporter.Event(mb.TransformMapStrToEvent("kubernetes", mapStrEvent, nil)) + event := mb.TransformMapStrToEvent("kubernetes", mapStrEvent, nil) + if m.clusterMeta != nil { + event.RootFields.DeepUpdate(m.clusterMeta) + } + reporter.Event(event) }, UpdateFunc: func(obj interface{}) { mapStrEvent := generateMapStrFromEvent(obj.(*kubernetes.Event), m.dedotConfig, m.Logger()) - reporter.Event(mb.TransformMapStrToEvent("kubernetes", mapStrEvent, nil)) + event := mb.TransformMapStrToEvent("kubernetes", mapStrEvent, nil) + if m.clusterMeta != nil { + event.RootFields.DeepUpdate(m.clusterMeta) + } + reporter.Event(event) }, // ignore events that are deleted DeleteFunc: nil, From 353f11c4eeee8b8148e83a58e8022e0cba54763b Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 19 Apr 2022 09:54:01 +0100 Subject: [PATCH 2/4] add changelog + fmt Signed-off-by: chrismark --- CHANGELOG.next.asciidoc | 2 ++ metricbeat/module/kubernetes/event/event.go | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4d88ecc6b365..beba55afdd16 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -153,6 +153,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif - Populate new container ECS fields in Kubernetes module. {pull}30181[30181] - Populate ecs container fields in Containerd module. {pull}31025[31025] - Enhance Oracle Module: Change tablespace metricset collection period {issue}30948[30948] {pull}31259[#31259] +- Add orchestrator cluster ECS fields in kubernetes events {pull}31341[31341] + *Packetbeat* diff --git a/metricbeat/module/kubernetes/event/event.go b/metricbeat/module/kubernetes/event/event.go index 66150229dbd1..8fb690d930ba 100644 --- a/metricbeat/module/kubernetes/event/event.go +++ b/metricbeat/module/kubernetes/event/event.go @@ -19,9 +19,10 @@ package event import ( "fmt" - k8sclient "k8s.io/client-go/kubernetes" "time" + k8sclient "k8s.io/client-go/kubernetes" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/kubernetes" "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" @@ -88,7 +89,6 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { AnnotationsDedot: config.AnnotationsDedot, } - ms := &MetricSet{ BaseMetricSet: base, dedotConfig: dedotConfig, From 0b8830f720fd3e885753208cc207d8b1f1f94c36 Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 19 Apr 2022 10:17:30 +0100 Subject: [PATCH 3/4] code improvements Signed-off-by: chrismark --- CHANGELOG.next.asciidoc | 1 - metricbeat/module/kubernetes/event/event.go | 28 +++++++++++---------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index beba55afdd16..2d12ebd4d9fa 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -155,7 +155,6 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif - Enhance Oracle Module: Change tablespace metricset collection period {issue}30948[30948] {pull}31259[#31259] - Add orchestrator cluster ECS fields in kubernetes events {pull}31341[31341] - *Packetbeat* diff --git a/metricbeat/module/kubernetes/event/event.go b/metricbeat/module/kubernetes/event/event.go index 8fb690d930ba..f2c529091182 100644 --- a/metricbeat/module/kubernetes/event/event.go +++ b/metricbeat/module/kubernetes/event/event.go @@ -100,6 +100,9 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // add ECS orchestrator fields cfg, _ := common.NewConfigFrom(&config) ecsClusterMeta, err := getClusterECSMeta(cfg, client) + if err != nil { + ms.Logger().Debugf("could not retrieve cluster metadata: %w", err) + } if ecsClusterMeta != nil { ms.clusterMeta = ecsClusterMeta } @@ -110,7 +113,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func getClusterECSMeta(cfg *common.Config, client k8sclient.Interface) (common.MapStr, error) { clusterInfo, err := metadata.GetKubernetesClusterIdentifier(cfg, client) if err != nil { - return nil, fmt.Errorf("fail to init kubernetes watcher: %w", err) + return nil, fmt.Errorf("fail to get kubernetes cluster metadata: %w", err) } ecsClusterMeta := common.MapStr{} if clusterInfo.Url != "" { @@ -127,20 +130,10 @@ func (m *MetricSet) Run(reporter mb.PushReporterV2) { now := time.Now() handler := kubernetes.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - mapStrEvent := generateMapStrFromEvent(obj.(*kubernetes.Event), m.dedotConfig, m.Logger()) - event := mb.TransformMapStrToEvent("kubernetes", mapStrEvent, nil) - if m.clusterMeta != nil { - event.RootFields.DeepUpdate(m.clusterMeta) - } - reporter.Event(event) + m.reportEvent(obj, reporter) }, UpdateFunc: func(obj interface{}) { - mapStrEvent := generateMapStrFromEvent(obj.(*kubernetes.Event), m.dedotConfig, m.Logger()) - event := mb.TransformMapStrToEvent("kubernetes", mapStrEvent, nil) - if m.clusterMeta != nil { - event.RootFields.DeepUpdate(m.clusterMeta) - } - reporter.Event(event) + m.reportEvent(obj, reporter) }, // ignore events that are deleted DeleteFunc: nil, @@ -176,6 +169,15 @@ func (m *MetricSet) Run(reporter mb.PushReporterV2) { m.watcher.Stop() } +func (m *MetricSet) reportEvent(obj interface{}, reporter mb.PushReporterV2) { + mapStrEvent := generateMapStrFromEvent(obj.(*kubernetes.Event), m.dedotConfig, m.Logger()) + event := mb.TransformMapStrToEvent("kubernetes", mapStrEvent, nil) + if m.clusterMeta != nil { + event.RootFields.DeepUpdate(m.clusterMeta) + } + reporter.Event(event) +} + func generateMapStrFromEvent(eve *kubernetes.Event, dedotConfig dedotConfig, logger *logp.Logger) common.MapStr { eventMeta := common.MapStr{ "timestamp": common.MapStr{ From 8e45163086343c55ba1ce6146654f10d80adfd14 Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 19 Apr 2022 11:27:17 +0100 Subject: [PATCH 4/4] fix lint issues Signed-off-by: chrismark --- metricbeat/module/kubernetes/event/event.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/metricbeat/module/kubernetes/event/event.go b/metricbeat/module/kubernetes/event/event.go index f2c529091182..559e2151a5d9 100644 --- a/metricbeat/module/kubernetes/event/event.go +++ b/metricbeat/module/kubernetes/event/event.go @@ -99,7 +99,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // add ECS orchestrator fields cfg, _ := common.NewConfigFrom(&config) - ecsClusterMeta, err := getClusterECSMeta(cfg, client) + ecsClusterMeta, err := getClusterECSMeta(cfg, client, ms.Logger()) if err != nil { ms.Logger().Debugf("could not retrieve cluster metadata: %w", err) } @@ -110,17 +110,17 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return ms, nil } -func getClusterECSMeta(cfg *common.Config, client k8sclient.Interface) (common.MapStr, error) { +func getClusterECSMeta(cfg *common.Config, client k8sclient.Interface, logger *logp.Logger) (common.MapStr, error) { clusterInfo, err := metadata.GetKubernetesClusterIdentifier(cfg, client) if err != nil { return nil, fmt.Errorf("fail to get kubernetes cluster metadata: %w", err) } ecsClusterMeta := common.MapStr{} if clusterInfo.Url != "" { - ecsClusterMeta.Put("orchestrator.cluster.url", clusterInfo.Url) + util.ShouldPut(ecsClusterMeta, "orchestrator.cluster.url", clusterInfo.Url, logger) } if clusterInfo.Name != "" { - ecsClusterMeta.Put("orchestrator.cluster.name", clusterInfo.Name) + util.ShouldPut(ecsClusterMeta, "orchestrator.cluster.name", clusterInfo.Name, logger) } return ecsClusterMeta, nil }