From f9a35d884f2f36400be4af0c1848626a50048574 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Thu, 30 Nov 2023 15:09:22 +0100 Subject: [PATCH] NETOBSERV-1404 mitigate narrowcache weakness on missed events narrowcache might contain outdated data in case of missed events, add a mechanism to invalidate entries on errors --- pkg/narrowcache/client.go | 54 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/pkg/narrowcache/client.go b/pkg/narrowcache/client.go index 6346cff24..2d67da98a 100644 --- a/pkg/narrowcache/client.go +++ b/pkg/narrowcache/client.go @@ -9,6 +9,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/workqueue" @@ -230,3 +231,56 @@ func (c *Client) GetSource(ctx context.Context, obj client.Object) (source.Sourc }, }, nil } + +func (c *Client) clearEntry(obj client.Object) { + c.wmut.Lock() + defer c.wmut.Unlock() + + key := types.NamespacedName{Name: obj.GetName(), Namespace: obj.GetNamespace()} + gvk, _ := c.GroupVersionKindFor(obj) + strGVK := gvk.String() + if _, managed := c.watchedGVKs[strGVK]; managed { + strGVK := gvk.String() + objKey := strGVK + "|" + key.String() + delete(c.watchedObjects, objKey) + } +} + +func (c *Client) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + if err := c.Client.Create(ctx, obj, opts...); err != nil { + // might be due to an outdated cache, clear the corresponding entry + log.FromContext(ctx). + WithName("narrowcache"). + WithValues("name", obj.GetName(), "namespace", obj.GetNamespace()). + Info("Invalidating cache entry") + c.clearEntry(obj) + return err + } + return nil +} + +func (c *Client) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { + if err := c.Client.Delete(ctx, obj, opts...); err != nil { + // might be due to an outdated cache, clear the corresponding entry + log.FromContext(ctx). + WithName("narrowcache"). + WithValues("name", obj.GetName(), "namespace", obj.GetNamespace()). + Info("Invalidating cache entry") + c.clearEntry(obj) + return err + } + return nil +} + +func (c *Client) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + if err := c.Client.Update(ctx, obj, opts...); err != nil { + // might be due to an outdated cache, clear the corresponding entry + log.FromContext(ctx). + WithName("narrowcache"). + WithValues("name", obj.GetName(), "namespace", obj.GetNamespace()). + Info("Invalidating cache entry") + c.clearEntry(obj) + return err + } + return nil +}