Skip to content

Commit

Permalink
handle dropped events in a common manner
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Bertschy <matthias.bertschy@gmail.com>
  • Loading branch information
matthyx committed Jul 11, 2024
1 parent d394855 commit 1ac672f
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 84 deletions.
16 changes: 13 additions & 3 deletions pkg/containerwatcher/v1/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ import (
)

func (ch *IGContainerWatcher) capabilitiesEventCallback(event *tracercapabilitiestype.Event) {
if event.Type != types.NORMAL {
// dropped event
if event.Type == types.DEBUG {
return
}

if isDroppedEvent(event.Type, event.Message) {
logger.L().Ctx(ch.ctx).Warning("capabilities tracer got drop events - we may miss some realtime data", helpers.Interface("event", event), helpers.String("error", event.Message))
return
}
_ = ch.capabilitiesWorkerPool.Invoke(*event)

ch.capabilitiesWorkerChan <- event
}

func (ch *IGContainerWatcher) startCapabilitiesTracing() error {
Expand All @@ -34,6 +38,12 @@ func (ch *IGContainerWatcher) startCapabilitiesTracing() error {
if err != nil {
return fmt.Errorf("creating tracer: %w", err)
}
go func() {
for event := range ch.capabilitiesWorkerChan {
_ = ch.capabilitiesWorkerPool.Invoke(*event)
}
}()

ch.capabilitiesTracer = tracerCapabilities

return nil
Expand Down
13 changes: 13 additions & 0 deletions pkg/containerwatcher/v1/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package containerwatcher

import (
"strings"

"github.com/inspektor-gadget/inspektor-gadget/pkg/types"
)

func isDroppedEvent(eventType types.EventType, message string) bool {
return eventType != types.NORMAL &&
eventType != types.DEBUG &&
strings.Contains(message, "stop tracing container")
}
12 changes: 4 additions & 8 deletions pkg/containerwatcher/v1/container_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/inspektor-gadget/inspektor-gadget/pkg/operators"
"github.com/inspektor-gadget/inspektor-gadget/pkg/socketenricher"
tracercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/tracer-collection"
"github.com/inspektor-gadget/inspektor-gadget/pkg/types"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/k8s-interface/k8sinterface"
Expand Down Expand Up @@ -165,8 +164,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli

k8sContainerID := utils.CreateK8sContainerID(event.K8s.Namespace, event.K8s.PodName, event.K8s.ContainerName)

// dropped events
if event.Type != types.NORMAL {
if isDroppedEvent(event.Type, event.Message) {
applicationProfileManager.ReportDroppedEvent(k8sContainerID)
return
}
Expand All @@ -193,8 +191,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
}
k8sContainerID := utils.CreateK8sContainerID(event.K8s.Namespace, event.K8s.PodName, event.K8s.ContainerName)

// dropped events
if event.Type != types.NORMAL {
if isDroppedEvent(event.Type, event.Message) {
applicationProfileManager.ReportDroppedEvent(k8sContainerID)
return
}
Expand Down Expand Up @@ -222,8 +219,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
}
k8sContainerID := utils.CreateK8sContainerID(event.K8s.Namespace, event.K8s.PodName, event.K8s.ContainerName)

// dropped events
if event.Type != types.NORMAL {
if isDroppedEvent(event.Type, event.Message) {
networkManagerv1Client.ReportDroppedEvent(event.Runtime.ContainerID, event)
networkManagerClient.ReportDroppedEvent(k8sContainerID)
return
Expand Down Expand Up @@ -282,7 +278,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
return nil, fmt.Errorf("creating symlink worker pool: %w", err)
}
// Create a hardlink worker pool
hardlinkWorkerPool, err := ants.NewPoolWithFunc(symlinkWorkerPoolSize, func(i interface{}) {
hardlinkWorkerPool, err := ants.NewPoolWithFunc(hardlinkWorkerPoolSize, func(i interface{}) {
event := i.(tracerhardlinktype.Event)
if event.K8s.ContainerName == "" {
return
Expand Down
10 changes: 6 additions & 4 deletions pkg/containerwatcher/v1/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import (
)

func (ch *IGContainerWatcher) dnsEventCallback(event *tracerdnstype.Event) {
if event.Type != types.NORMAL && event.Type != types.DEBUG {
if event.Type == types.DEBUG {
return
}

if isDroppedEvent(event.Type, event.Message) {
logger.L().Ctx(ch.ctx).Warning("dns tracer got drop events - we may miss some realtime data", helpers.Interface("event", event), helpers.String("error", event.Message))
return
}
Expand All @@ -34,7 +38,7 @@ func (ch *IGContainerWatcher) startDNSTracing() error {
}
go func() {
for event := range ch.dnsWorkerChan {
ch.dnsWorkerPool.Invoke(*event)
_ = ch.dnsWorkerPool.Invoke(*event)
}
}()

Expand Down Expand Up @@ -68,8 +72,6 @@ func (ch *IGContainerWatcher) stopDNSTracing() error {
if err := ch.tracerCollection.RemoveTracer(dnsTraceName); err != nil {
return fmt.Errorf("removing tracer: %w", err)
}

ch.dnsTracer.Close()

return nil
}
20 changes: 10 additions & 10 deletions pkg/containerwatcher/v1/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import (
tracerexec "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/exec/tracer"
tracerexectype "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/exec/types"
"github.com/inspektor-gadget/inspektor-gadget/pkg/types"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
)

func (ch *IGContainerWatcher) execEventCallback(event *tracerexectype.Event) {
if event.Type != types.NORMAL {
// dropped event
logger.L().Ctx(ch.ctx).Warning("exec tracer got drop events - we may miss some realtime data", helpers.Interface("event", event), helpers.String("error", event.Message))
if event.Type == types.DEBUG {
return
}

// do not skip dropped events as their processing is done in the worker

if event.Retval > -1 && event.Comm != "" {
ch.execWorkerChan <- event
}
Expand All @@ -31,16 +31,16 @@ func (ch *IGContainerWatcher) startExecTracing() error {
return fmt.Errorf("getting execMountnsmap: %w", err)
}

tracerExec, err := tracerexec.NewTracer(&tracerexec.Config{MountnsMap: execMountnsmap, GetPaths: true}, ch.containerCollection, ch.execEventCallback)
if err != nil {
return fmt.Errorf("creating tracer: %w", err)
}
go func() {
for event := range ch.execWorkerChan {
ch.execWorkerPool.Invoke(*event)
_ = ch.execWorkerPool.Invoke(*event)
}
}()

tracerExec, err := tracerexec.NewTracer(&tracerexec.Config{MountnsMap: execMountnsmap, GetPaths: true}, ch.containerCollection, ch.execEventCallback)
if err != nil {
return fmt.Errorf("creating tracer: %w", err)
}
ch.execTracer = tracerExec

return nil
Expand Down
19 changes: 11 additions & 8 deletions pkg/containerwatcher/v1/hardlink.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ import (
)

func (ch *IGContainerWatcher) hardlinkEventCallback(event *tracerhardlinktype.Event) {
if event.Type != types.NORMAL {
// dropped event
if event.Type == types.DEBUG {
return
}

if isDroppedEvent(event.Type, event.Message) {
logger.L().Ctx(ch.ctx).Warning("hardlink tracer got drop events - we may miss some realtime data", helpers.Interface("event", event), helpers.String("error", event.Message))
return
}
Expand All @@ -32,17 +35,17 @@ func (ch *IGContainerWatcher) startHardlinkTracing() error {
return fmt.Errorf("getting hardlinkMountnsmap: %w", err)
}

tracerHardlink, err := tracerhardlink.NewTracer(&tracerhardlink.Config{MountnsMap: hardlinkMountnsmap}, ch.containerCollection, ch.hardlinkEventCallback)
if err != nil {
return fmt.Errorf("creating tracer: %w", err)
}
go func() {
for event := range ch.hardlinkWorkerChan {
ch.hardlinkWorkerPool.Invoke(*event)
_ = ch.hardlinkWorkerPool.Invoke(*event)
}
}()

tracerhardlink, err := tracerhardlink.NewTracer(&tracerhardlink.Config{MountnsMap: hardlinkMountnsmap}, ch.containerCollection, ch.hardlinkEventCallback)
if err != nil {
return fmt.Errorf("creating tracer: %w", err)
}
ch.hardlinkTracer = tracerhardlink
ch.hardlinkTracer = tracerHardlink

return nil
}
Expand Down
40 changes: 15 additions & 25 deletions pkg/containerwatcher/v1/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package containerwatcher

import (
"fmt"
"strings"

"github.com/inspektor-gadget/inspektor-gadget/pkg/container-collection/networktracer"
tracernetwork "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/network/tracer"
Expand All @@ -12,36 +11,29 @@ import (
"github.com/inspektor-gadget/inspektor-gadget/pkg/operators/kubenameresolver"
"github.com/inspektor-gadget/inspektor-gadget/pkg/types"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
)

func (ch *IGContainerWatcher) networkEventCallback(event *tracernetworktypes.Event) {
if event.Type == types.DEBUG {
return
}

if event.Type != types.NORMAL {
// dropped event
if !strings.Contains(event.Message, "stop tracing container") {
logger.L().Ctx(ch.ctx).Warning("network tracer got drop events - we may miss some realtime data", helpers.Interface("event", event), helpers.String("error", event.Message))
}
} else {
ch.containerCollection.EnrichByMntNs(&event.CommonData, event.MountNsID)
ch.containerCollection.EnrichByNetNs(&event.CommonData, event.NetNsID)
// do not skip dropped events as their processing is done in the worker

if ch.kubeIPInstance != nil {
_ = ch.kubeIPInstance.EnrichEvent(event)
}
if ch.kubeNameInstance != nil {
_ = ch.kubeNameInstance.EnrichEvent(event)
}
ch.containerCollection.EnrichByMntNs(&event.CommonData, event.MountNsID)
ch.containerCollection.EnrichByNetNs(&event.CommonData, event.NetNsID)

if ch.kubeIPInstance != nil {
_ = ch.kubeIPInstance.EnrichEvent(event)
}
if ch.kubeNameInstance != nil {
_ = ch.kubeNameInstance.EnrichEvent(event)
}

ch.networkWorkerChan <- event
}

func (ch *IGContainerWatcher) startNetworkTracing() error {

if err := ch.tracerCollection.AddTracer(networkTraceName, ch.containerSelector); err != nil {
return fmt.Errorf("adding tracer: %w", err)
}
Expand All @@ -52,7 +44,7 @@ func (ch *IGContainerWatcher) startNetworkTracing() error {
}
go func() {
for event := range ch.networkWorkerChan {
ch.networkWorkerPool.Invoke(*event)
_ = ch.networkWorkerPool.Invoke(*event)
}
}()

Expand Down Expand Up @@ -80,7 +72,7 @@ func (ch *IGContainerWatcher) startNetworkTracing() error {

_, err = networktracer.ConnectToContainerCollection(config)
if err != nil {
return fmt.Errorf("creating tracer: %w", err)
return fmt.Errorf("connecting tracer to container collection: %w", err)
}

return nil
Expand All @@ -89,25 +81,25 @@ func (ch *IGContainerWatcher) startNetworkTracing() error {
// startKubernetesResolution starts the kubeIP and kube name resolution, which are used to enrich network communication data
func (ch *IGContainerWatcher) startKubernetesResolution() error {
kubeIPOp := operators.GetRaw(kubeipresolver.OperatorName).(*kubeipresolver.KubeIPResolver)
kubeIPOp.Init(nil)
_ = kubeIPOp.Init(nil)

kubeIPInstance, err := kubeIPOp.Instantiate(nil, nil, nil)
if err != nil {
return fmt.Errorf("creating kube ip resolver: %w", err)
}

ch.kubeIPInstance = kubeIPInstance
ch.kubeIPInstance.PreGadgetRun()
_ = ch.kubeIPInstance.PreGadgetRun()

kubeNameOp := operators.GetRaw(kubenameresolver.OperatorName).(*kubenameresolver.KubeNameResolver)
kubeNameOp.Init(nil)
_ = kubeNameOp.Init(nil)
kubeNameInstance, err := kubeNameOp.Instantiate(nil, nil, nil)
if err != nil {
return fmt.Errorf("creating kube name resolver: %w", err)
}

ch.kubeNameInstance = kubeNameInstance
ch.kubeNameInstance.PreGadgetRun()
_ = ch.kubeNameInstance.PreGadgetRun()

return nil
}
Expand All @@ -117,8 +109,6 @@ func (ch *IGContainerWatcher) stopNetworkTracing() error {
if err := ch.tracerCollection.RemoveTracer(networkTraceName); err != nil {
return fmt.Errorf("removing tracer: %w", err)
}

ch.networkTracer.Close()

return nil
}
22 changes: 11 additions & 11 deletions pkg/containerwatcher/v1/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import (
traceropen "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/open/tracer"
traceropentype "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/open/types"
"github.com/inspektor-gadget/inspektor-gadget/pkg/types"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
)

func (ch *IGContainerWatcher) openEventCallback(event *traceropentype.Event) {
if event.Type != types.NORMAL {
// dropped event
logger.L().Ctx(ch.ctx).Warning("open tracer got drop events - we may miss some realtime data", helpers.Interface("event", event), helpers.String("error", event.Message))
if event.Type == types.DEBUG {
return
}

// do not skip dropped events as their processing is done in the worker

if event.Err > -1 && event.FullPath != "" {
ch.openWorkerChan <- event
}
Expand All @@ -25,12 +25,6 @@ func (ch *IGContainerWatcher) startOpenTracing() error {
return fmt.Errorf("adding tracer: %w", err)
}

go func() {
for c := range ch.openWorkerChan {
_ = ch.openWorkerPool.Invoke(*c)
}
}()

// Get mount namespace map to filter by containers
openMountnsmap, err := ch.tracerCollection.TracerMountNsMap(openTraceName)
if err != nil {
Expand All @@ -41,6 +35,12 @@ func (ch *IGContainerWatcher) startOpenTracing() error {
if err != nil {
return fmt.Errorf("creating tracer: %w", err)
}
go func() {
for event := range ch.openWorkerChan {
_ = ch.openWorkerPool.Invoke(*event)
}
}()

ch.openTracer = tracerOpen

return nil
Expand Down
17 changes: 10 additions & 7 deletions pkg/containerwatcher/v1/randomx.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ import (
)

func (ch *IGContainerWatcher) randomxEventCallback(event *tracerrandomxtype.Event) {
if event.Type != types.NORMAL {
// dropped event
if event.Type == types.DEBUG {
return
}

if isDroppedEvent(event.Type, event.Message) {
logger.L().Ctx(ch.ctx).Warning("randomx tracer got drop events - we may miss some realtime data", helpers.Interface("event", event), helpers.String("error", event.Message))
return
}
Expand All @@ -32,16 +35,16 @@ func (ch *IGContainerWatcher) startRandomxTracing() error {
return fmt.Errorf("getting randomxMountnsmap: %w", err)
}

tracerrandomx, err := tracerandomx.NewTracer(&tracerandomx.Config{MountnsMap: randomxMountnsmap}, ch.containerCollection, ch.randomxEventCallback)
if err != nil {
return fmt.Errorf("creating tracer: %w", err)
}
go func() {
for event := range ch.randomxWorkerChan {
ch.randomxWorkerPool.Invoke(*event)
_ = ch.randomxWorkerPool.Invoke(*event)
}
}()

tracerrandomx, err := tracerandomx.NewTracer(&tracerandomx.Config{MountnsMap: randomxMountnsmap}, ch.containerCollection, ch.randomxEventCallback)
if err != nil {
return fmt.Errorf("creating tracer: %w", err)
}
ch.randomxTracer = tracerrandomx

return nil
Expand Down
Loading

0 comments on commit 1ac672f

Please sign in to comment.