Skip to content

Commit

Permalink
Merge pull request #275 from kubescape/feature/traceloop
Browse files Browse the repository at this point in the history
Feature/traceloop
  • Loading branch information
amitschendel authored May 28, 2024
2 parents 60f89b8 + 795462e commit 9cb4d95
Show file tree
Hide file tree
Showing 24 changed files with 254 additions and 439 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import containercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/con

type ApplicationProfileManagerClient interface {
ContainerCallback(notif containercollection.PubSubEvent)
RegisterPeekFunc(peek func(mntns uint64) ([]string, error))
ReportCapability(k8sContainerID, capability string)
ReportFileExec(k8sContainerID, path string, args []string)
ReportFileOpen(k8sContainerID, path string, flags []string)
ReportSyscallEvent(k8sContainerID string, syscall string)
ReportDroppedEvent(k8sContainerID string)
ContainerReachedMaxTime(containerID string)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ func (a ApplicationProfileManagerMock) ContainerCallback(_ containercollection.P
// noop
}

func (a ApplicationProfileManagerMock) RegisterPeekFunc(_ func(mntns uint64) ([]string, error)) {
// noop
}

func (a ApplicationProfileManagerMock) ReportCapability(_, _ string) {
// noop
}
Expand All @@ -31,6 +27,10 @@ func (a ApplicationProfileManagerMock) ReportFileOpen(_, _ string, _ []string) {
// noop
}

func (a ApplicationProfileManagerMock) ReportSyscallEvent(_ string, _ string) {
// noop
}

func (a ApplicationProfileManagerMock) ReportDroppedEvent(_ string) {
// noop
}
Expand Down
40 changes: 24 additions & 16 deletions pkg/applicationprofilemanager/v1/applicationprofile_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ type ApplicationProfileManager struct {
toSaveCapabilities maps.SafeMap[string, mapset.Set[string]] // key is k8sContainerID
toSaveExecs maps.SafeMap[string, *maps.SafeMap[string, []string]] // key is k8sContainerID
toSaveOpens maps.SafeMap[string, *maps.SafeMap[string, mapset.Set[string]]] // key is k8sContainerID
toSaveSyscalls maps.SafeMap[string, mapset.Set[string]] // key is k8sContainerID
watchedContainerChannels maps.SafeMap[string, chan error] // key is ContainerID
k8sClient k8sclient.K8sClientInterface
k8sObjectCache objectcache.K8sObjectCache
storageClient storage.StorageClient
syscallPeekFunc func(nsMountId uint64) ([]string, error)
preRunningContainerIDs mapset.Set[string]
}

Expand Down Expand Up @@ -138,6 +138,7 @@ func (am *ApplicationProfileManager) deleteResources(watchedContainer *utils.Wat
am.toSaveCapabilities.Delete(watchedContainer.K8sContainerID)
am.toSaveExecs.Delete(watchedContainer.K8sContainerID)
am.toSaveOpens.Delete(watchedContainer.K8sContainerID)
am.toSaveSyscalls.Delete(watchedContainer.K8sContainerID)
am.watchedContainerChannels.Delete(watchedContainer.ContainerID)
}
func (am *ApplicationProfileManager) ContainerReachedMaxTime(containerID string) {
Expand Down Expand Up @@ -238,17 +239,15 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon

// application activity is deprecated
// syscalls now reside in the application profile

// get syscalls from IG
var observedSyscalls []string
var toSaveSyscalls []string
if am.syscallPeekFunc != nil {
if observedSyscalls, err = am.syscallPeekFunc(watchedContainer.NsMntId); err == nil {
// check if we have new activities to save
savedSyscalls := am.savedSyscalls.Get(watchedContainer.K8sContainerID)
toSaveSyscallsSet := mapset.NewSet[string](observedSyscalls...).Difference(savedSyscalls)
if !toSaveSyscallsSet.IsEmpty() {
toSaveSyscalls = toSaveSyscallsSet.ToSlice()
if toSaveSyscalls := am.toSaveSyscalls.Get(watchedContainer.K8sContainerID); toSaveSyscalls.Cardinality() > 0 {
// remove syscalls to save in a thread safe way using Pop
for {
syscall, continuePop := toSaveSyscalls.Pop()
if continuePop {
observedSyscalls = append(observedSyscalls, syscall)
} else {
break
}
}
}
Expand Down Expand Up @@ -300,7 +299,7 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon
// 3a. the object is missing its container slice - ADD one with the container profile at the right index
// 3b. the object is missing the container profile - ADD the container profile at the right index
// 3c. default - patch the container ourselves and REPLACE it at the right index
if len(capabilities) > 0 || len(execs) > 0 || len(opens) > 0 || len(toSaveSyscalls) > 0 || watchedContainer.StatusUpdated() {
if len(capabilities) > 0 || len(execs) > 0 || len(opens) > 0 || len(observedSyscalls) > 0 || watchedContainer.StatusUpdated() {
// 0. calculate patch
operations := utils.CreateCapabilitiesPatchOperations(capabilities, observedSyscalls, execs, opens, watchedContainer.ContainerType.String(), watchedContainer.ContainerIndex)
operations = utils.AppendStatusAnnotationPatchOperations(operations, watchedContainer)
Expand Down Expand Up @@ -465,6 +464,8 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon
if gotErr != nil {
// restore capabilities set
am.toSaveCapabilities.Get(watchedContainer.K8sContainerID).Append(capabilities...)
// restore syscalls set
am.toSaveSyscalls.Get(watchedContainer.K8sContainerID).Append(observedSyscalls...)
// restore execs map entries
toSaveExecs.Range(func(uniqueExecIdentifier string, v []string) bool {
if !am.toSaveExecs.Get(watchedContainer.K8sContainerID).Has(uniqueExecIdentifier) {
Expand All @@ -479,7 +480,7 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon
watchedContainer.ResetStatusUpdatedFlag()

// record saved syscalls
am.savedSyscalls.Get(watchedContainer.K8sContainerID).Append(toSaveSyscalls...)
am.savedSyscalls.Get(watchedContainer.K8sContainerID).Append(observedSyscalls...)
// record saved capabilities
am.savedCapabilities.Get(watchedContainer.K8sContainerID).Append(capabilities...)
// record saved execs
Expand Down Expand Up @@ -568,6 +569,7 @@ func (am *ApplicationProfileManager) ContainerCallback(notif containercollection
am.toSaveCapabilities.Set(k8sContainerID, mapset.NewSet[string]())
am.toSaveExecs.Set(k8sContainerID, new(maps.SafeMap[string, []string]))
am.toSaveOpens.Set(k8sContainerID, new(maps.SafeMap[string, mapset.Set[string]]))
am.toSaveSyscalls.Set(k8sContainerID, mapset.NewSet[string]())
am.removedContainers.Remove(k8sContainerID) // make sure container is not in the removed list
am.trackedContainers.Add(k8sContainerID)
go am.startApplicationProfiling(ctx, notif.Container, k8sContainerID)
Expand All @@ -580,15 +582,21 @@ func (am *ApplicationProfileManager) ContainerCallback(notif containercollection
}
}

func (am *ApplicationProfileManager) RegisterPeekFunc(peek func(mntns uint64) ([]string, error)) {
am.syscallPeekFunc = peek
func (am *ApplicationProfileManager) ReportSyscallEvent(k8sContainerID string, syscall string) {
if err := am.waitForContainer(k8sContainerID); err != nil {
return
}
if am.savedSyscalls.Get(k8sContainerID).ContainsOne(syscall) {
return
}
am.toSaveSyscalls.Get(k8sContainerID).Add(syscall)
}

func (am *ApplicationProfileManager) ReportCapability(k8sContainerID, capability string) {
if err := am.waitForContainer(k8sContainerID); err != nil {
return
}
if am.savedCapabilities.Has(capability) {
if am.savedCapabilities.Get(k8sContainerID).ContainsOne(capability) {
return
}
am.toSaveCapabilities.Get(k8sContainerID).Add(capability)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ func TestApplicationProfileManager(t *testing.T) {
},
},
}
// register peek function for syscall tracer
go am.RegisterPeekFunc(func(_ uint64) ([]string, error) {
return []string{"dup", "listen"}, nil
})
// report syscall events
go am.ReportSyscallEvent("ns/pod/cont", "dup")
go am.ReportSyscallEvent("ns/pod/cont", "listen")
// report capability
go am.ReportCapability("ns/pod/cont", "NET_BIND_SERVICE")
// report file exec
Expand Down
36 changes: 29 additions & 7 deletions pkg/containerwatcher/v1/container_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

mapset "github.com/deckarep/golang-set/v2"
containercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/container-collection"
tracerseccomp "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/advise/seccomp/tracer"
tracercapabilities "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/capabilities/tracer"
tracercapabilitiestype "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/capabilities/types"
tracerdns "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/dns/tracer"
Expand All @@ -34,6 +33,8 @@ import (
tracernetworktype "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/network/types"
traceropen "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/open/tracer"
traceropentype "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/open/types"
tracersyscalls "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/traceloop/tracer"
tracersyscallstype "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/traceloop/types"
"github.com/inspektor-gadget/inspektor-gadget/pkg/operators"
"github.com/inspektor-gadget/inspektor-gadget/pkg/socketenricher"
tracercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/tracer-collection"
Expand All @@ -51,12 +52,14 @@ const (
dnsTraceName = "trace_dns"
openTraceName = "trace_open"
randomxTraceName = "trace_randomx"
syscallsTraceName = "trace_syscalls"
capabilitiesWorkerPoolSize = 1
execWorkerPoolSize = 2
openWorkerPoolSize = 8
networkWorkerPoolSize = 1
dnsWorkerPoolSize = 5
randomxWorkerPoolSize = 1
syscallsWorkerPoolSize = 3
)

type IGContainerWatcher struct {
Expand Down Expand Up @@ -86,7 +89,7 @@ type IGContainerWatcher struct {
capabilitiesTracer *tracercapabilities.Tracer
execTracer *tracerexec.Tracer
openTracer *traceropen.Tracer
syscallTracer *tracerseccomp.Tracer
syscallTracer *tracersyscalls.Tracer
networkTracer *tracernetwork.Tracer
dnsTracer *tracerdns.Tracer
randomxTracer *tracerandomx.Tracer
Expand All @@ -100,13 +103,15 @@ type IGContainerWatcher struct {
networkWorkerPool *ants.PoolWithFunc
dnsWorkerPool *ants.PoolWithFunc
randomxWorkerPool *ants.PoolWithFunc
syscallsWorkerPool *ants.PoolWithFunc

capabilitiesWorkerChan chan *tracercapabilitiestype.Event
execWorkerChan chan *tracerexectype.Event
openWorkerChan chan *traceropentype.Event
networkWorkerChan chan *tracernetworktype.Event
dnsWorkerChan chan *tracerdnstype.Event
randomxWorkerChan chan *tracerandomxtype.Event
syscallsWorkerChan chan *tracersyscallstype.Event

preRunningContainersIDs mapset.Set[string]

Expand Down Expand Up @@ -138,7 +143,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
metrics.ReportEvent(utils.CapabilitiesEventType)
k8sContainerID := utils.CreateK8sContainerID(event.K8s.Namespace, event.K8s.PodName, event.K8s.ContainerName)
applicationProfileManager.ReportCapability(k8sContainerID, event.CapName)
ruleManager.ReportCapability(k8sContainerID, event)
ruleManager.ReportCapability(event)
})
if err != nil {
return nil, fmt.Errorf("creating capabilities worker pool: %w", err)
Expand Down Expand Up @@ -166,7 +171,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
metrics.ReportEvent(utils.ExecveEventType)
applicationProfileManager.ReportFileExec(k8sContainerID, path, event.Args)
relevancyManager.ReportFileExec(event.Runtime.ContainerID, k8sContainerID, path)
ruleManager.ReportFileExec(k8sContainerID, event)
ruleManager.ReportFileExec(event)
malwareManager.ReportFileExec(k8sContainerID, event)
})
if err != nil {
Expand Down Expand Up @@ -195,7 +200,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
metrics.ReportEvent(utils.OpenEventType)
applicationProfileManager.ReportFileOpen(k8sContainerID, path, event.Flags)
relevancyManager.ReportFileOpen(event.Runtime.ContainerID, k8sContainerID, path)
ruleManager.ReportFileOpen(k8sContainerID, event)
ruleManager.ReportFileOpen(event)
malwareManager.ReportFileOpen(k8sContainerID, event)
})
if err != nil {
Expand All @@ -219,7 +224,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
metrics.ReportEvent(utils.NetworkEventType)
networkManagerv1Client.ReportNetworkEvent(event.Runtime.ContainerID, event)
networkManagerClient.ReportNetworkEvent(k8sContainerID, event)
ruleManager.ReportNetworkEvent(event.Runtime.ContainerID, event)
ruleManager.ReportNetworkEvent(event)
})
if err != nil {
return nil, fmt.Errorf("creating network worker pool: %w", err)
Expand Down Expand Up @@ -252,11 +257,26 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
return
}
metrics.ReportEvent(utils.RandomXEventType)
ruleManager.ReportRandomxEvent(event.Runtime.ContainerID, event)
ruleManager.ReportRandomxEvent(event)
})
if err != nil {
return nil, fmt.Errorf("creating randomx worker pool: %w", err)
}
// Create a syscalls worker pool
syscallsWorkerPool, err := ants.NewPoolWithFunc(syscallsWorkerPoolSize, func(i interface{}) {
event := i.(tracersyscallstype.Event)
if event.K8s.ContainerName == "" {
return
}
k8sContainerID := utils.CreateK8sContainerID(event.K8s.Namespace, event.K8s.PodName, event.K8s.ContainerName)

metrics.ReportEvent(utils.SyscallEventType)
applicationProfileManager.ReportSyscallEvent(k8sContainerID, event.Syscall)
ruleManager.ReportSyscallEvent(event)
})
if err != nil {
return nil, fmt.Errorf("creating syscalls worker pool: %w", err)
}

return &IGContainerWatcher{
// Configuration
Expand Down Expand Up @@ -285,6 +305,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
networkWorkerPool: networkWorkerPool,
dnsWorkerPool: dnsWorkerPool,
randomxWorkerPool: randomxWorkerPool,
syscallsWorkerPool: syscallsWorkerPool,
metrics: metrics,
preRunningContainersIDs: preRunningContainers,

Expand All @@ -295,6 +316,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
networkWorkerChan: make(chan *tracernetworktype.Event, 500000),
dnsWorkerChan: make(chan *tracerdnstype.Event, 100000),
randomxWorkerChan: make(chan *tracerandomxtype.Event, 5000),
syscallsWorkerChan: make(chan *tracersyscallstype.Event, 100000),

// cache
ruleBindingPodNotify: ruleBindingPodNotify,
Expand Down
34 changes: 31 additions & 3 deletions pkg/containerwatcher/v1/container_watcher_private.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,32 @@ func (ch *IGContainerWatcher) containerCallback(notif containercollection.PubSub
switch notif.Type {
case containercollection.EventTypeAddContainer:
logger.L().Info("start monitor on container", helpers.String("container ID", notif.Container.Runtime.ContainerID), helpers.String("k8s workload", k8sContainerID))

if ch.syscallTracer != nil {
// Attach the container to the syscall tracer
if err := ch.syscallTracer.Attach(notif.Container.Runtime.ContainerID, notif.Container.Mntns); err != nil {
logger.L().Fatal("attaching container to syscall tracer", helpers.String("container ID", notif.Container.Runtime.ContainerID), helpers.String("k8s workload", k8sContainerID), helpers.Error(err))
}

// Read the syscall tracer events in a separate goroutine.
go func() {
for {
evs, err := ch.syscallTracer.Read(notif.Container.Runtime.ContainerID)
if err != nil {
logger.L().Debug("syscalls perf buffer closed", helpers.String("error", err.Error()))
return
}
for _, ev := range evs {
ev.SetContainerMetadata(notif.Container)
ch.syscallEventCallback(ev)
}

// Sleep for a while before reading the next batch of events.
time.Sleep(2 * time.Second) // TODO: make this configurable.
}
}()
}

time.AfterFunc(ch.cfg.MaxSniffingTime, func() {
logger.L().Info("monitoring time ended", helpers.String("container ID", notif.Container.Runtime.ContainerID), helpers.String("k8s workload", k8sContainerID))
ch.timeBasedContainers.Remove(notif.Container.Runtime.ContainerID)
Expand All @@ -49,6 +75,8 @@ func (ch *IGContainerWatcher) containerCallback(notif containercollection.PubSub
helpers.String("k8s workload", k8sContainerID))
ch.preRunningContainersIDs.Remove(notif.Container.Runtime.ContainerID)
ch.timeBasedContainers.Remove(notif.Container.Runtime.ContainerID)
ch.syscallTracer.Detach(notif.Container.Mntns)
ch.syscallTracer.Delete(notif.Container.Runtime.ContainerID)
}
}
func (ch *IGContainerWatcher) startContainerCollection(ctx context.Context) error {
Expand Down Expand Up @@ -167,8 +195,8 @@ func (ch *IGContainerWatcher) stopContainerCollection() {
func (ch *IGContainerWatcher) startTracers() error {
if ch.cfg.EnableApplicationProfile {
// Start syscall tracer
if err := ch.startSystemcallTracing(); err != nil {
logger.L().Error("error starting seccomp tracing", helpers.Error(err))
if err := ch.startSyscallTracing(); err != nil {
logger.L().Error("error starting syscall tracing", helpers.Error(err))
return err
}
// Start capabilities tracer
Expand Down Expand Up @@ -241,7 +269,7 @@ func (ch *IGContainerWatcher) stopTracers() error {
}
// Stop syscall tracer
if err := ch.stopSystemcallTracing(); err != nil {
logger.L().Error("error stopping seccomp tracing", helpers.Error(err))
logger.L().Error("error stopping syscall tracing", helpers.Error(err))
errs = errors.Join(errs, err)
}
}
Expand Down
Loading

0 comments on commit 9cb4d95

Please sign in to comment.