From 1ac672fce0764edb4db9c287d234e27be7400322 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Thu, 11 Jul 2024 11:31:11 +0200 Subject: [PATCH] handle dropped events in a common manner Signed-off-by: Matthias Bertschy --- pkg/containerwatcher/v1/capabilities.go | 16 ++++++-- pkg/containerwatcher/v1/common.go | 13 +++++++ pkg/containerwatcher/v1/container_watcher.go | 12 ++---- pkg/containerwatcher/v1/dns.go | 10 +++-- pkg/containerwatcher/v1/exec.go | 20 +++++----- pkg/containerwatcher/v1/hardlink.go | 19 ++++++---- pkg/containerwatcher/v1/network.go | 40 ++++++++------------ pkg/containerwatcher/v1/open.go | 22 +++++------ pkg/containerwatcher/v1/randomx.go | 17 +++++---- pkg/containerwatcher/v1/symlink.go | 19 ++++++---- 10 files changed, 104 insertions(+), 84 deletions(-) create mode 100644 pkg/containerwatcher/v1/common.go diff --git a/pkg/containerwatcher/v1/capabilities.go b/pkg/containerwatcher/v1/capabilities.go index cef0f57c..955ed112 100644 --- a/pkg/containerwatcher/v1/capabilities.go +++ b/pkg/containerwatcher/v1/capabilities.go @@ -11,12 +11,16 @@ import ( ) func (ch *IGContainerWatcher) capabilitiesEventCallback(event *tracercapabilitiestype.Event) { - if event.Type != types.NORMAL { - // dropped event + if event.Type == types.DEBUG { + return + } + + if isDroppedEvent(event.Type, event.Message) { logger.L().Ctx(ch.ctx).Warning("capabilities tracer got drop events - we may miss some realtime data", helpers.Interface("event", event), helpers.String("error", event.Message)) return } - _ = ch.capabilitiesWorkerPool.Invoke(*event) + + ch.capabilitiesWorkerChan <- event } func (ch *IGContainerWatcher) startCapabilitiesTracing() error { @@ -34,6 +38,12 @@ func (ch *IGContainerWatcher) startCapabilitiesTracing() error { if err != nil { return fmt.Errorf("creating tracer: %w", err) } + go func() { + for event := range ch.capabilitiesWorkerChan { + _ = ch.capabilitiesWorkerPool.Invoke(*event) + } + }() + ch.capabilitiesTracer = tracerCapabilities return nil diff --git a/pkg/containerwatcher/v1/common.go b/pkg/containerwatcher/v1/common.go new file mode 100644 index 00000000..fd136877 --- /dev/null +++ b/pkg/containerwatcher/v1/common.go @@ -0,0 +1,13 @@ +package containerwatcher + +import ( + "strings" + + "github.com/inspektor-gadget/inspektor-gadget/pkg/types" +) + +func isDroppedEvent(eventType types.EventType, message string) bool { + return eventType != types.NORMAL && + eventType != types.DEBUG && + strings.Contains(message, "stop tracing container") +} diff --git a/pkg/containerwatcher/v1/container_watcher.go b/pkg/containerwatcher/v1/container_watcher.go index 4170d744..0adcb016 100644 --- a/pkg/containerwatcher/v1/container_watcher.go +++ b/pkg/containerwatcher/v1/container_watcher.go @@ -21,7 +21,6 @@ import ( "github.com/inspektor-gadget/inspektor-gadget/pkg/operators" "github.com/inspektor-gadget/inspektor-gadget/pkg/socketenricher" tracercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/tracer-collection" - "github.com/inspektor-gadget/inspektor-gadget/pkg/types" "github.com/kubescape/go-logger" "github.com/kubescape/go-logger/helpers" "github.com/kubescape/k8s-interface/k8sinterface" @@ -165,8 +164,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli k8sContainerID := utils.CreateK8sContainerID(event.K8s.Namespace, event.K8s.PodName, event.K8s.ContainerName) - // dropped events - if event.Type != types.NORMAL { + if isDroppedEvent(event.Type, event.Message) { applicationProfileManager.ReportDroppedEvent(k8sContainerID) return } @@ -193,8 +191,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli } k8sContainerID := utils.CreateK8sContainerID(event.K8s.Namespace, event.K8s.PodName, event.K8s.ContainerName) - // dropped events - if event.Type != types.NORMAL { + if isDroppedEvent(event.Type, event.Message) { applicationProfileManager.ReportDroppedEvent(k8sContainerID) return } @@ -222,8 +219,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli } k8sContainerID := utils.CreateK8sContainerID(event.K8s.Namespace, event.K8s.PodName, event.K8s.ContainerName) - // dropped events - if event.Type != types.NORMAL { + if isDroppedEvent(event.Type, event.Message) { networkManagerv1Client.ReportDroppedEvent(event.Runtime.ContainerID, event) networkManagerClient.ReportDroppedEvent(k8sContainerID) return @@ -282,7 +278,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli return nil, fmt.Errorf("creating symlink worker pool: %w", err) } // Create a hardlink worker pool - hardlinkWorkerPool, err := ants.NewPoolWithFunc(symlinkWorkerPoolSize, func(i interface{}) { + hardlinkWorkerPool, err := ants.NewPoolWithFunc(hardlinkWorkerPoolSize, func(i interface{}) { event := i.(tracerhardlinktype.Event) if event.K8s.ContainerName == "" { return diff --git a/pkg/containerwatcher/v1/dns.go b/pkg/containerwatcher/v1/dns.go index c68a42d2..d2bb2bd7 100644 --- a/pkg/containerwatcher/v1/dns.go +++ b/pkg/containerwatcher/v1/dns.go @@ -12,7 +12,11 @@ import ( ) func (ch *IGContainerWatcher) dnsEventCallback(event *tracerdnstype.Event) { - if event.Type != types.NORMAL && event.Type != types.DEBUG { + if event.Type == types.DEBUG { + return + } + + if isDroppedEvent(event.Type, event.Message) { logger.L().Ctx(ch.ctx).Warning("dns tracer got drop events - we may miss some realtime data", helpers.Interface("event", event), helpers.String("error", event.Message)) return } @@ -34,7 +38,7 @@ func (ch *IGContainerWatcher) startDNSTracing() error { } go func() { for event := range ch.dnsWorkerChan { - ch.dnsWorkerPool.Invoke(*event) + _ = ch.dnsWorkerPool.Invoke(*event) } }() @@ -68,8 +72,6 @@ func (ch *IGContainerWatcher) stopDNSTracing() error { if err := ch.tracerCollection.RemoveTracer(dnsTraceName); err != nil { return fmt.Errorf("removing tracer: %w", err) } - ch.dnsTracer.Close() - return nil } diff --git a/pkg/containerwatcher/v1/exec.go b/pkg/containerwatcher/v1/exec.go index 0f3113e7..136e1325 100644 --- a/pkg/containerwatcher/v1/exec.go +++ b/pkg/containerwatcher/v1/exec.go @@ -6,15 +6,15 @@ import ( tracerexec "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/exec/tracer" tracerexectype "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/exec/types" "github.com/inspektor-gadget/inspektor-gadget/pkg/types" - "github.com/kubescape/go-logger" - "github.com/kubescape/go-logger/helpers" ) func (ch *IGContainerWatcher) execEventCallback(event *tracerexectype.Event) { - if event.Type != types.NORMAL { - // dropped event - logger.L().Ctx(ch.ctx).Warning("exec tracer got drop events - we may miss some realtime data", helpers.Interface("event", event), helpers.String("error", event.Message)) + if event.Type == types.DEBUG { + return } + + // do not skip dropped events as their processing is done in the worker + if event.Retval > -1 && event.Comm != "" { ch.execWorkerChan <- event } @@ -31,16 +31,16 @@ func (ch *IGContainerWatcher) startExecTracing() error { return fmt.Errorf("getting execMountnsmap: %w", err) } + tracerExec, err := tracerexec.NewTracer(&tracerexec.Config{MountnsMap: execMountnsmap, GetPaths: true}, ch.containerCollection, ch.execEventCallback) + if err != nil { + return fmt.Errorf("creating tracer: %w", err) + } go func() { for event := range ch.execWorkerChan { - ch.execWorkerPool.Invoke(*event) + _ = ch.execWorkerPool.Invoke(*event) } }() - tracerExec, err := tracerexec.NewTracer(&tracerexec.Config{MountnsMap: execMountnsmap, GetPaths: true}, ch.containerCollection, ch.execEventCallback) - if err != nil { - return fmt.Errorf("creating tracer: %w", err) - } ch.execTracer = tracerExec return nil diff --git a/pkg/containerwatcher/v1/hardlink.go b/pkg/containerwatcher/v1/hardlink.go index 8178dbce..2bce183a 100644 --- a/pkg/containerwatcher/v1/hardlink.go +++ b/pkg/containerwatcher/v1/hardlink.go @@ -12,8 +12,11 @@ import ( ) func (ch *IGContainerWatcher) hardlinkEventCallback(event *tracerhardlinktype.Event) { - if event.Type != types.NORMAL { - // dropped event + if event.Type == types.DEBUG { + return + } + + if isDroppedEvent(event.Type, event.Message) { logger.L().Ctx(ch.ctx).Warning("hardlink tracer got drop events - we may miss some realtime data", helpers.Interface("event", event), helpers.String("error", event.Message)) return } @@ -32,17 +35,17 @@ func (ch *IGContainerWatcher) startHardlinkTracing() error { return fmt.Errorf("getting hardlinkMountnsmap: %w", err) } + tracerHardlink, err := tracerhardlink.NewTracer(&tracerhardlink.Config{MountnsMap: hardlinkMountnsmap}, ch.containerCollection, ch.hardlinkEventCallback) + if err != nil { + return fmt.Errorf("creating tracer: %w", err) + } go func() { for event := range ch.hardlinkWorkerChan { - ch.hardlinkWorkerPool.Invoke(*event) + _ = ch.hardlinkWorkerPool.Invoke(*event) } }() - tracerhardlink, err := tracerhardlink.NewTracer(&tracerhardlink.Config{MountnsMap: hardlinkMountnsmap}, ch.containerCollection, ch.hardlinkEventCallback) - if err != nil { - return fmt.Errorf("creating tracer: %w", err) - } - ch.hardlinkTracer = tracerhardlink + ch.hardlinkTracer = tracerHardlink return nil } diff --git a/pkg/containerwatcher/v1/network.go b/pkg/containerwatcher/v1/network.go index 4eff8642..cdb8b414 100644 --- a/pkg/containerwatcher/v1/network.go +++ b/pkg/containerwatcher/v1/network.go @@ -2,7 +2,6 @@ package containerwatcher import ( "fmt" - "strings" "github.com/inspektor-gadget/inspektor-gadget/pkg/container-collection/networktracer" tracernetwork "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/network/tracer" @@ -12,7 +11,6 @@ import ( "github.com/inspektor-gadget/inspektor-gadget/pkg/operators/kubenameresolver" "github.com/inspektor-gadget/inspektor-gadget/pkg/types" "github.com/kubescape/go-logger" - "github.com/kubescape/go-logger/helpers" ) func (ch *IGContainerWatcher) networkEventCallback(event *tracernetworktypes.Event) { @@ -20,28 +18,22 @@ func (ch *IGContainerWatcher) networkEventCallback(event *tracernetworktypes.Eve return } - if event.Type != types.NORMAL { - // dropped event - if !strings.Contains(event.Message, "stop tracing container") { - logger.L().Ctx(ch.ctx).Warning("network tracer got drop events - we may miss some realtime data", helpers.Interface("event", event), helpers.String("error", event.Message)) - } - } else { - ch.containerCollection.EnrichByMntNs(&event.CommonData, event.MountNsID) - ch.containerCollection.EnrichByNetNs(&event.CommonData, event.NetNsID) + // do not skip dropped events as their processing is done in the worker - if ch.kubeIPInstance != nil { - _ = ch.kubeIPInstance.EnrichEvent(event) - } - if ch.kubeNameInstance != nil { - _ = ch.kubeNameInstance.EnrichEvent(event) - } + ch.containerCollection.EnrichByMntNs(&event.CommonData, event.MountNsID) + ch.containerCollection.EnrichByNetNs(&event.CommonData, event.NetNsID) + + if ch.kubeIPInstance != nil { + _ = ch.kubeIPInstance.EnrichEvent(event) + } + if ch.kubeNameInstance != nil { + _ = ch.kubeNameInstance.EnrichEvent(event) } ch.networkWorkerChan <- event } func (ch *IGContainerWatcher) startNetworkTracing() error { - if err := ch.tracerCollection.AddTracer(networkTraceName, ch.containerSelector); err != nil { return fmt.Errorf("adding tracer: %w", err) } @@ -52,7 +44,7 @@ func (ch *IGContainerWatcher) startNetworkTracing() error { } go func() { for event := range ch.networkWorkerChan { - ch.networkWorkerPool.Invoke(*event) + _ = ch.networkWorkerPool.Invoke(*event) } }() @@ -80,7 +72,7 @@ func (ch *IGContainerWatcher) startNetworkTracing() error { _, err = networktracer.ConnectToContainerCollection(config) if err != nil { - return fmt.Errorf("creating tracer: %w", err) + return fmt.Errorf("connecting tracer to container collection: %w", err) } return nil @@ -89,7 +81,7 @@ func (ch *IGContainerWatcher) startNetworkTracing() error { // startKubernetesResolution starts the kubeIP and kube name resolution, which are used to enrich network communication data func (ch *IGContainerWatcher) startKubernetesResolution() error { kubeIPOp := operators.GetRaw(kubeipresolver.OperatorName).(*kubeipresolver.KubeIPResolver) - kubeIPOp.Init(nil) + _ = kubeIPOp.Init(nil) kubeIPInstance, err := kubeIPOp.Instantiate(nil, nil, nil) if err != nil { @@ -97,17 +89,17 @@ func (ch *IGContainerWatcher) startKubernetesResolution() error { } ch.kubeIPInstance = kubeIPInstance - ch.kubeIPInstance.PreGadgetRun() + _ = ch.kubeIPInstance.PreGadgetRun() kubeNameOp := operators.GetRaw(kubenameresolver.OperatorName).(*kubenameresolver.KubeNameResolver) - kubeNameOp.Init(nil) + _ = kubeNameOp.Init(nil) kubeNameInstance, err := kubeNameOp.Instantiate(nil, nil, nil) if err != nil { return fmt.Errorf("creating kube name resolver: %w", err) } ch.kubeNameInstance = kubeNameInstance - ch.kubeNameInstance.PreGadgetRun() + _ = ch.kubeNameInstance.PreGadgetRun() return nil } @@ -117,8 +109,6 @@ func (ch *IGContainerWatcher) stopNetworkTracing() error { if err := ch.tracerCollection.RemoveTracer(networkTraceName); err != nil { return fmt.Errorf("removing tracer: %w", err) } - ch.networkTracer.Close() - return nil } diff --git a/pkg/containerwatcher/v1/open.go b/pkg/containerwatcher/v1/open.go index 05c7b21d..41d19655 100644 --- a/pkg/containerwatcher/v1/open.go +++ b/pkg/containerwatcher/v1/open.go @@ -6,15 +6,15 @@ import ( traceropen "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/open/tracer" traceropentype "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/open/types" "github.com/inspektor-gadget/inspektor-gadget/pkg/types" - "github.com/kubescape/go-logger" - "github.com/kubescape/go-logger/helpers" ) func (ch *IGContainerWatcher) openEventCallback(event *traceropentype.Event) { - if event.Type != types.NORMAL { - // dropped event - logger.L().Ctx(ch.ctx).Warning("open tracer got drop events - we may miss some realtime data", helpers.Interface("event", event), helpers.String("error", event.Message)) + if event.Type == types.DEBUG { + return } + + // do not skip dropped events as their processing is done in the worker + if event.Err > -1 && event.FullPath != "" { ch.openWorkerChan <- event } @@ -25,12 +25,6 @@ func (ch *IGContainerWatcher) startOpenTracing() error { return fmt.Errorf("adding tracer: %w", err) } - go func() { - for c := range ch.openWorkerChan { - _ = ch.openWorkerPool.Invoke(*c) - } - }() - // Get mount namespace map to filter by containers openMountnsmap, err := ch.tracerCollection.TracerMountNsMap(openTraceName) if err != nil { @@ -41,6 +35,12 @@ func (ch *IGContainerWatcher) startOpenTracing() error { if err != nil { return fmt.Errorf("creating tracer: %w", err) } + go func() { + for event := range ch.openWorkerChan { + _ = ch.openWorkerPool.Invoke(*event) + } + }() + ch.openTracer = tracerOpen return nil diff --git a/pkg/containerwatcher/v1/randomx.go b/pkg/containerwatcher/v1/randomx.go index 58562713..f3dccc3a 100644 --- a/pkg/containerwatcher/v1/randomx.go +++ b/pkg/containerwatcher/v1/randomx.go @@ -12,8 +12,11 @@ import ( ) func (ch *IGContainerWatcher) randomxEventCallback(event *tracerrandomxtype.Event) { - if event.Type != types.NORMAL { - // dropped event + if event.Type == types.DEBUG { + return + } + + if isDroppedEvent(event.Type, event.Message) { logger.L().Ctx(ch.ctx).Warning("randomx tracer got drop events - we may miss some realtime data", helpers.Interface("event", event), helpers.String("error", event.Message)) return } @@ -32,16 +35,16 @@ func (ch *IGContainerWatcher) startRandomxTracing() error { return fmt.Errorf("getting randomxMountnsmap: %w", err) } + tracerrandomx, err := tracerandomx.NewTracer(&tracerandomx.Config{MountnsMap: randomxMountnsmap}, ch.containerCollection, ch.randomxEventCallback) + if err != nil { + return fmt.Errorf("creating tracer: %w", err) + } go func() { for event := range ch.randomxWorkerChan { - ch.randomxWorkerPool.Invoke(*event) + _ = ch.randomxWorkerPool.Invoke(*event) } }() - tracerrandomx, err := tracerandomx.NewTracer(&tracerandomx.Config{MountnsMap: randomxMountnsmap}, ch.containerCollection, ch.randomxEventCallback) - if err != nil { - return fmt.Errorf("creating tracer: %w", err) - } ch.randomxTracer = tracerrandomx return nil diff --git a/pkg/containerwatcher/v1/symlink.go b/pkg/containerwatcher/v1/symlink.go index 14e48855..43708498 100644 --- a/pkg/containerwatcher/v1/symlink.go +++ b/pkg/containerwatcher/v1/symlink.go @@ -12,8 +12,11 @@ import ( ) func (ch *IGContainerWatcher) symlinkEventCallback(event *tracersymlinktype.Event) { - if event.Type != types.NORMAL { - // dropped event + if event.Type == types.DEBUG { + return + } + + if isDroppedEvent(event.Type, event.Message) { logger.L().Ctx(ch.ctx).Warning("symlink tracer got drop events - we may miss some realtime data", helpers.Interface("event", event), helpers.String("error", event.Message)) return } @@ -32,17 +35,17 @@ func (ch *IGContainerWatcher) startSymlinkTracing() error { return fmt.Errorf("getting symlinkMountnsmap: %w", err) } + tracerSymlink, err := tracersymlink.NewTracer(&tracersymlink.Config{MountnsMap: symlinkMountnsmap}, ch.containerCollection, ch.symlinkEventCallback) + if err != nil { + return fmt.Errorf("creating tracer: %w", err) + } go func() { for event := range ch.symlinkWorkerChan { - ch.symlinkWorkerPool.Invoke(*event) + _ = ch.symlinkWorkerPool.Invoke(*event) } }() - tracersymlink, err := tracersymlink.NewTracer(&tracersymlink.Config{MountnsMap: symlinkMountnsmap}, ch.containerCollection, ch.symlinkEventCallback) - if err != nil { - return fmt.Errorf("creating tracer: %w", err) - } - ch.symlinkTracer = tracersymlink + ch.symlinkTracer = tracerSymlink return nil }