Skip to content

Commit

Permalink
Merge pull request #389 from kubescape/feature/user-managed
Browse files Browse the repository at this point in the history
User generated application profiles/NN
  • Loading branch information
matthyx authored Oct 29, 2024
2 parents dfd15ec + 26b3371 commit 0c6721c
Show file tree
Hide file tree
Showing 7 changed files with 1,398 additions and 265 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/component-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ jobs:
Test_01_BasicAlertTest,
Test_02_AllAlertsFromMaliciousApp,
Test_03_BasicLoadActivities,
Test_04_MemoryLeak,
# Test_04_MemoryLeak,
Test_05_MemoryLeak_10K_Alerts,
Test_06_KillProcessInTheMiddle,
Test_07_RuleBindingApplyTest,
Test_08_ApplicationProfilePatching,
Test_10_MalwareDetectionTest,
Test_11_EndpointTest,
# Test_10_DemoTest
# Test_11_DuplicationTest
Test_12_MergingProfilesTest,
Test_13_MergingNetworkNeighborhoodTest,
]
steps:
- name: Checkout code
Expand Down
242 changes: 190 additions & 52 deletions pkg/objectcache/applicationprofilecache/applicationprofilecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package applicationprofilecache
import (
"context"
"fmt"
"strings"
"time"

mapset "github.com/deckarep/golang-set/v2"
Expand Down Expand Up @@ -47,30 +48,102 @@ func newApplicationProfileState(ap *v1beta1.ApplicationProfile) applicationProfi
}

type ApplicationProfileCacheImpl struct {
containerToSlug maps.SafeMap[string, string] // cache the containerID to slug mapping, this will enable a quick lookup of the application profile
slugToAppProfile maps.SafeMap[string, *v1beta1.ApplicationProfile] // cache the application profile
slugToContainers maps.SafeMap[string, mapset.Set[string]] // cache the containerIDs that belong to the application profile, this will enable removing from cache AP without pods
slugToState maps.SafeMap[string, applicationProfileState] // cache the containerID to slug mapping, this will enable a quick lookup of the application profile
storageClient versioned.SpdxV1beta1Interface
allProfiles mapset.Set[string] // cache all the application profiles that are ready. this will enable removing from cache AP without pods that are running on the same node
nodeName string
maxDelaySeconds int // maximum delay in seconds before getting the full object from the storage
containerToSlug maps.SafeMap[string, string] // cache the containerID to slug mapping, this will enable a quick lookup of the application profile
slugToAppProfile maps.SafeMap[string, *v1beta1.ApplicationProfile] // cache the application profile
slugToContainers maps.SafeMap[string, mapset.Set[string]] // cache the containerIDs that belong to the application profile, this will enable removing from cache AP without pods
slugToState maps.SafeMap[string, applicationProfileState] // cache the containerID to slug mapping, this will enable a quick lookup of the application profile
storageClient versioned.SpdxV1beta1Interface
allProfiles mapset.Set[string] // cache all the application profiles that are ready. this will enable removing from cache AP without pods that are running on the same node
nodeName string
maxDelaySeconds int // maximum delay in seconds before getting the full object from the storage
userManagedProfiles maps.SafeMap[string, *v1beta1.ApplicationProfile]
}

func NewApplicationProfileCache(nodeName string, storageClient versioned.SpdxV1beta1Interface, maxDelaySeconds int) *ApplicationProfileCacheImpl {
return &ApplicationProfileCacheImpl{
nodeName: nodeName,
maxDelaySeconds: maxDelaySeconds,
storageClient: storageClient,
containerToSlug: maps.SafeMap[string, string]{},
slugToContainers: maps.SafeMap[string, mapset.Set[string]]{},
allProfiles: mapset.NewSet[string](),
nodeName: nodeName,
maxDelaySeconds: maxDelaySeconds,
storageClient: storageClient,
containerToSlug: maps.SafeMap[string, string]{},
slugToAppProfile: maps.SafeMap[string, *v1beta1.ApplicationProfile]{},
slugToContainers: maps.SafeMap[string, mapset.Set[string]]{},
slugToState: maps.SafeMap[string, applicationProfileState]{},
allProfiles: mapset.NewSet[string](),
userManagedProfiles: maps.SafeMap[string, *v1beta1.ApplicationProfile]{},
}

}

// ------------------ objectcache.ApplicationProfileCache methods -----------------------

func (ap *ApplicationProfileCacheImpl) handleUserManagedProfile(appProfile *v1beta1.ApplicationProfile) {
baseProfileName := strings.TrimPrefix(appProfile.GetName(), "ug-")
baseProfileUniqueName := objectcache.UniqueName(appProfile.GetNamespace(), baseProfileName)

// Get the full user managed profile from the storage
userManagedProfile, err := ap.getApplicationProfile(appProfile.GetNamespace(), appProfile.GetName())
if err != nil {
logger.L().Error("failed to get full application profile", helpers.Error(err))
return
}

// Store the user-managed profile temporarily
ap.userManagedProfiles.Set(baseProfileUniqueName, userManagedProfile)

// If we have the base profile cached, fetch a fresh copy and merge.
// If the base profile is not cached yet, the merge will be attempted when it's added.
if ap.slugToAppProfile.Has(baseProfileUniqueName) {
// Fetch fresh base profile from cluster
freshBaseProfile, err := ap.getApplicationProfile(appProfile.GetNamespace(), baseProfileName)
if err != nil {
logger.L().Error("failed to get fresh base profile for merging",
helpers.String("name", baseProfileName),
helpers.String("namespace", appProfile.GetNamespace()),
helpers.Error(err))
return
}

mergedProfile := ap.performMerge(freshBaseProfile, userManagedProfile)
ap.slugToAppProfile.Set(baseProfileUniqueName, mergedProfile)

// Clean up the user-managed profile after successful merge
ap.userManagedProfiles.Delete(baseProfileUniqueName)

logger.L().Debug("merged user-managed profile with fresh base profile",
helpers.String("name", baseProfileName),
helpers.String("namespace", appProfile.GetNamespace()))
}
}

func (ap *ApplicationProfileCacheImpl) addApplicationProfile(obj runtime.Object) {
appProfile := obj.(*v1beta1.ApplicationProfile)
apName := objectcache.MetaUniqueName(appProfile)

if isUserManagedProfile(appProfile) {
ap.handleUserManagedProfile(appProfile)
return
}

// Original behavior for normal profiles
apState := newApplicationProfileState(appProfile)
ap.slugToState.Set(apName, apState)

if apState.status != helpersv1.Completed {
if ap.slugToAppProfile.Has(apName) {
ap.slugToAppProfile.Delete(apName)
ap.allProfiles.Remove(apName)
}
return
}

ap.allProfiles.Add(apName)

if ap.slugToContainers.Has(apName) {
time.AfterFunc(utils.RandomDuration(ap.maxDelaySeconds, time.Second), func() {
ap.addFullApplicationProfile(appProfile, apName)
})
}
}

func (ap *ApplicationProfileCacheImpl) GetApplicationProfile(containerID string) *v1beta1.ApplicationProfile {
if s := ap.containerToSlug.Get(containerID); s != "" {
return ap.slugToAppProfile.Get(s)
Expand Down Expand Up @@ -110,15 +183,15 @@ func (ap *ApplicationProfileCacheImpl) AddHandler(ctx context.Context, obj runti
if pod, ok := obj.(*corev1.Pod); ok {
ap.addPod(pod)
} else if appProfile, ok := obj.(*v1beta1.ApplicationProfile); ok {
ap.addApplicationProfile(ctx, appProfile)
ap.addApplicationProfile(appProfile)
}
}

func (ap *ApplicationProfileCacheImpl) ModifyHandler(ctx context.Context, obj runtime.Object) {
if pod, ok := obj.(*corev1.Pod); ok {
ap.addPod(pod)
} else if appProfile, ok := obj.(*v1beta1.ApplicationProfile); ok {
ap.addApplicationProfile(ctx, appProfile)
ap.addApplicationProfile(appProfile)
}
}

Expand Down Expand Up @@ -213,57 +286,98 @@ func (ap *ApplicationProfileCacheImpl) removeContainer(containerID string) {
}

// ------------------ watch application profile methods -----------------------
func (ap *ApplicationProfileCacheImpl) addApplicationProfile(_ context.Context, obj runtime.Object) {
appProfile := obj.(*v1beta1.ApplicationProfile)
apName := objectcache.MetaUniqueName(appProfile)

apState := newApplicationProfileState(appProfile)
ap.slugToState.Set(apName, apState)

// the cache holds only completed application profiles.
// check if the application profile is completed
// if status was completed and now is not (e.g. mode changed from complete to partial), remove from cache
if apState.status != helpersv1.Completed {
if ap.slugToAppProfile.Has(apName) {
ap.slugToAppProfile.Delete(apName)
ap.allProfiles.Remove(apName)
}
return
}

// add to the cache
ap.allProfiles.Add(apName)

if ap.slugToContainers.Has(apName) {
// get the full application profile from the storage
// the watch only returns the metadata
// avoid thundering herd problem by adding a random delay
time.AfterFunc(utils.RandomDuration(ap.maxDelaySeconds, time.Second), func() {
ap.addFullApplicationProfile(appProfile, apName)
})
}
}

func (ap *ApplicationProfileCacheImpl) addFullApplicationProfile(appProfile *v1beta1.ApplicationProfile, apName string) {
fullAP, err := ap.getApplicationProfile(appProfile.GetNamespace(), appProfile.GetName())
if err != nil {
logger.L().Error("failed to get full application profile", helpers.Error(err))
return
}

// Check if there's a pending user-managed profile to merge
if ap.userManagedProfiles.Has(apName) {
userManagedProfile := ap.userManagedProfiles.Get(apName)
fullAP = ap.performMerge(fullAP, userManagedProfile)
// Clean up the user-managed profile after successful merge
ap.userManagedProfiles.Delete(apName)
logger.L().Debug("merged pending user-managed profile", helpers.String("name", apName))
}

ap.slugToAppProfile.Set(apName, fullAP)
for _, i := range ap.slugToContainers.Get(apName).ToSlice() {
ap.containerToSlug.Set(i, apName)
}
logger.L().Debug("added pod to application profile cache", helpers.String("name", apName))
}

func (ap *ApplicationProfileCacheImpl) performMerge(normalProfile, userManagedProfile *v1beta1.ApplicationProfile) *v1beta1.ApplicationProfile {
mergedProfile := normalProfile.DeepCopy()

// Merge spec
mergedProfile.Spec.Containers = ap.mergeContainers(mergedProfile.Spec.Containers, userManagedProfile.Spec.Containers)
mergedProfile.Spec.InitContainers = ap.mergeContainers(mergedProfile.Spec.InitContainers, userManagedProfile.Spec.InitContainers)
mergedProfile.Spec.EphemeralContainers = ap.mergeContainers(mergedProfile.Spec.EphemeralContainers, userManagedProfile.Spec.EphemeralContainers)

return mergedProfile
}

func (ap *ApplicationProfileCacheImpl) mergeContainers(normalContainers, userManagedContainers []v1beta1.ApplicationProfileContainer) []v1beta1.ApplicationProfileContainer {
if len(userManagedContainers) != len(normalContainers) {
// If the number of containers don't match, we can't merge
logger.L().Error("failed to merge user-managed profile with base profile",
helpers.Int("normalContainers len", len(normalContainers)),
helpers.Int("userManagedContainers len", len(userManagedContainers)),
helpers.String("reason", "number of containers don't match"))
return normalContainers
}

// Assuming the normalContainers are already in the correct Pod order
// We'll merge user containers at their corresponding positions
for i := range normalContainers {
for _, userContainer := range userManagedContainers {
if normalContainers[i].Name == userContainer.Name {
ap.mergeContainer(&normalContainers[i], &userContainer)
break
}
}
}
return normalContainers
}

func (ap *ApplicationProfileCacheImpl) mergeContainer(normalContainer, userContainer *v1beta1.ApplicationProfileContainer) {
normalContainer.Capabilities = append(normalContainer.Capabilities, userContainer.Capabilities...)
normalContainer.Execs = append(normalContainer.Execs, userContainer.Execs...)
normalContainer.Opens = append(normalContainer.Opens, userContainer.Opens...)
normalContainer.Syscalls = append(normalContainer.Syscalls, userContainer.Syscalls...)
normalContainer.Endpoints = append(normalContainer.Endpoints, userContainer.Endpoints...)
}

func (ap *ApplicationProfileCacheImpl) deleteApplicationProfile(obj runtime.Object) {
apName := objectcache.MetaUniqueName(obj.(metav1.Object))
ap.slugToAppProfile.Delete(apName)
ap.slugToState.Delete(apName)
ap.allProfiles.Remove(apName)
appProfile := obj.(*v1beta1.ApplicationProfile)
apName := objectcache.MetaUniqueName(appProfile)

logger.L().Info("deleted application profile from cache", helpers.String("uniqueSlug", apName))
if isUserManagedProfile(appProfile) {
// For user-managed profiles, we need to use the base name for cleanup
baseProfileName := strings.TrimPrefix(appProfile.GetName(), "ug-")
baseProfileUniqueName := objectcache.UniqueName(appProfile.GetNamespace(), baseProfileName)
ap.userManagedProfiles.Delete(baseProfileUniqueName)

logger.L().Debug("deleted user-managed profile from cache",
helpers.String("profileName", appProfile.GetName()),
helpers.String("baseProfile", baseProfileName))
} else {
// For normal profiles, clean up all related data
ap.slugToAppProfile.Delete(apName)
ap.slugToState.Delete(apName)
ap.allProfiles.Remove(apName)

// Log the deletion of normal profile
logger.L().Debug("deleted application profile from cache",
helpers.String("uniqueSlug", apName))
}

// Clean up any orphaned user-managed profiles
ap.cleanupOrphanedUserManagedProfiles()
}

func (ap *ApplicationProfileCacheImpl) getApplicationProfile(namespace, name string) (*v1beta1.ApplicationProfile, error) {
Expand Down Expand Up @@ -301,3 +415,27 @@ func getSlug(p *corev1.Pod) (string, error) {
}
return slug, nil
}

// Add cleanup method for any orphaned user-managed profiles
func (ap *ApplicationProfileCacheImpl) cleanupOrphanedUserManagedProfiles() {
// This could be called periodically or during certain operations
ap.userManagedProfiles.Range(func(key string, value *v1beta1.ApplicationProfile) bool {
if ap.slugToAppProfile.Has(key) {
// If base profile exists but merge didn't happen for some reason,
// attempt merge again and cleanup
if baseProfile := ap.slugToAppProfile.Get(key); baseProfile != nil {
mergedProfile := ap.performMerge(baseProfile, value)
ap.slugToAppProfile.Set(key, mergedProfile)
ap.userManagedProfiles.Delete(key)
logger.L().Debug("cleaned up orphaned user-managed profile", helpers.String("name", key))
}
}
return true
})
}

func isUserManagedProfile(appProfile *v1beta1.ApplicationProfile) bool {
return appProfile.Annotations != nil &&
appProfile.Annotations["kubescape.io/managed-by"] == "User" &&
strings.HasPrefix(appProfile.GetName(), "ug-")
}
Loading

0 comments on commit 0c6721c

Please sign in to comment.