diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4d88ecc6b365..2d12ebd4d9fa 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -153,6 +153,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif - Populate new container ECS fields in Kubernetes module. {pull}30181[30181] - Populate ecs container fields in Containerd module. {pull}31025[31025] - Enhance Oracle Module: Change tablespace metricset collection period {issue}30948[30948] {pull}31259[#31259] +- Add orchestrator cluster ECS fields in kubernetes events {pull}31341[31341] *Packetbeat* diff --git a/metricbeat/module/kubernetes/event/event.go b/metricbeat/module/kubernetes/event/event.go index c0ac28fb0c51..559e2151a5d9 100644 --- a/metricbeat/module/kubernetes/event/event.go +++ b/metricbeat/module/kubernetes/event/event.go @@ -21,8 +21,11 @@ import ( "fmt" "time" + k8sclient "k8s.io/client-go/kubernetes" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/kubernetes" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" "github.com/elastic/beats/v7/libbeat/common/safemapstr" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/mb" @@ -44,6 +47,7 @@ type MetricSet struct { watchOptions kubernetes.WatchOptions dedotConfig dedotConfig skipOlder bool + clusterMeta common.MapStr } // dedotConfig defines LabelsDedot and AnnotationsDedot. @@ -85,13 +89,40 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { AnnotationsDedot: config.AnnotationsDedot, } - return &MetricSet{ + ms := &MetricSet{ BaseMetricSet: base, dedotConfig: dedotConfig, watcher: watcher, watchOptions: watchOptions, skipOlder: config.SkipOlder, - }, nil + } + + // add ECS orchestrator fields + cfg, _ := common.NewConfigFrom(&config) + ecsClusterMeta, err := getClusterECSMeta(cfg, client, ms.Logger()) + if err != nil { + ms.Logger().Debugf("could not retrieve cluster metadata: %w", err) + } + if ecsClusterMeta != nil { + ms.clusterMeta = ecsClusterMeta + } + + return ms, nil +} + +func getClusterECSMeta(cfg *common.Config, client k8sclient.Interface, logger *logp.Logger) (common.MapStr, error) { + clusterInfo, err := metadata.GetKubernetesClusterIdentifier(cfg, client) + if err != nil { + return nil, fmt.Errorf("fail to get kubernetes cluster metadata: %w", err) + } + ecsClusterMeta := common.MapStr{} + if clusterInfo.Url != "" { + util.ShouldPut(ecsClusterMeta, "orchestrator.cluster.url", clusterInfo.Url, logger) + } + if clusterInfo.Name != "" { + util.ShouldPut(ecsClusterMeta, "orchestrator.cluster.name", clusterInfo.Name, logger) + } + return ecsClusterMeta, nil } // Run method provides the Kubernetes event watcher with a reporter with which events can be reported. @@ -99,12 +130,10 @@ func (m *MetricSet) Run(reporter mb.PushReporterV2) { now := time.Now() handler := kubernetes.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - mapStrEvent := generateMapStrFromEvent(obj.(*kubernetes.Event), m.dedotConfig, m.Logger()) - reporter.Event(mb.TransformMapStrToEvent("kubernetes", mapStrEvent, nil)) + m.reportEvent(obj, reporter) }, UpdateFunc: func(obj interface{}) { - mapStrEvent := generateMapStrFromEvent(obj.(*kubernetes.Event), m.dedotConfig, m.Logger()) - reporter.Event(mb.TransformMapStrToEvent("kubernetes", mapStrEvent, nil)) + m.reportEvent(obj, reporter) }, // ignore events that are deleted DeleteFunc: nil, @@ -140,6 +169,15 @@ func (m *MetricSet) Run(reporter mb.PushReporterV2) { m.watcher.Stop() } +func (m *MetricSet) reportEvent(obj interface{}, reporter mb.PushReporterV2) { + mapStrEvent := generateMapStrFromEvent(obj.(*kubernetes.Event), m.dedotConfig, m.Logger()) + event := mb.TransformMapStrToEvent("kubernetes", mapStrEvent, nil) + if m.clusterMeta != nil { + event.RootFields.DeepUpdate(m.clusterMeta) + } + reporter.Event(event) +} + func generateMapStrFromEvent(eve *kubernetes.Event, dedotConfig dedotConfig, logger *logp.Logger) common.MapStr { eventMeta := common.MapStr{ "timestamp": common.MapStr{