From 45ed5d45dc91c9954e0203ceda36a1c3864b882e Mon Sep 17 00:00:00 2001 From: Jim Ryan Date: Mon, 14 Oct 2024 10:55:14 +0100 Subject: [PATCH] use NewInformerWithOptions and TypedRateLimitingInterface --- internal/externaldns/controller.go | 39 +++++++-------- internal/externaldns/handlers.go | 14 ++++-- internal/k8s/status_test.go | 79 ++++++++++++++++++++---------- 3 files changed, 82 insertions(+), 50 deletions(-) diff --git a/internal/externaldns/controller.go b/internal/externaldns/controller.go index 780083cb41..c82292fc67 100644 --- a/internal/externaldns/controller.go +++ b/internal/externaldns/controller.go @@ -14,6 +14,7 @@ import ( extdnslisters "github.com/nginxinc/kubernetes-ingress/pkg/client/listers/externaldns/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" @@ -31,7 +32,7 @@ const ( type ExtDNSController struct { sync SyncFn ctx context.Context - queue workqueue.RateLimitingInterface + queue workqueue.TypedRateLimitingInterface[types.NamespacedName] recorder record.EventRecorder client k8s_nginx.Interface informerGroup map[string]*namespacedInformer @@ -60,9 +61,14 @@ type ExtDNSOpts struct { // NewController takes external dns config and return a new External DNS Controller. func NewController(opts *ExtDNSOpts) *ExtDNSController { ig := make(map[string]*namespacedInformer) + + rateLimiter := workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName]() + + queue := workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{Name: ControllerName}) + c := &ExtDNSController{ ctx: opts.context, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), + queue: queue, informerGroup: ig, recorder: opts.eventRecorder, client: opts.client, @@ -155,39 +161,31 @@ func (c *ExtDNSController) runWorker(ctx context.Context) { l := nl.LoggerFromContext(ctx) nl.Debugf(l, "processing items on the workqueue") for { - obj, shutdown := c.queue.Get() + key, shutdown := c.queue.Get() if shutdown { break } func() { - defer c.queue.Done(obj) - key, ok := obj.(string) - if !ok { - return - } - + defer c.queue.Done(key) if err := c.processItem(ctx, key); err != nil { nl.Debugf(l, "Re-queuing item due to error processing: %v", err) - c.queue.AddRateLimited(obj) + c.queue.AddRateLimited(key) return } nl.Debugf(l, "finished processing work item") - c.queue.Forget(obj) + c.queue.Forget(key) }() } } -func (c *ExtDNSController) processItem(ctx context.Context, key string) error { - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) - return err - } +func (c *ExtDNSController) processItem(ctx context.Context, key types.NamespacedName) error { + namespace := key.Namespace + name := key.Name l := nl.LoggerFromContext(ctx) var vs *conf_v1.VirtualServer nsi := getNamespacedInformer(namespace, c.informerGroup) - vs, err = nsi.vsLister.VirtualServers(namespace).Get(name) + vs, err := nsi.vsLister.VirtualServers(namespace).Get(name) // VS has been deleted if apierrors.IsNotFound(err) { @@ -201,7 +199,7 @@ func (c *ExtDNSController) processItem(ctx context.Context, key string) error { return c.sync(ctx, vs) } -func externalDNSHandler(queue workqueue.RateLimitingInterface) func(obj interface{}) { +func externalDNSHandler(queue workqueue.TypedRateLimitingInterface[types.NamespacedName]) func(obj interface{}) { return func(obj interface{}) { ep, ok := obj.(*extdns_v1.DNSEndpoint) if !ok { @@ -223,7 +221,8 @@ func externalDNSHandler(queue workqueue.RateLimitingInterface) func(obj interfac return } - queue.Add(ep.Namespace + "/" + ref.Name) + key := types.NamespacedName{Namespace: ep.Namespace, Name: ref.Name} + queue.Add(key) } } diff --git a/internal/externaldns/handlers.go b/internal/externaldns/handlers.go index f303e1cb3f..0ee24ea982 100644 --- a/internal/externaldns/handlers.go +++ b/internal/externaldns/handlers.go @@ -4,6 +4,8 @@ import ( "reflect" "time" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -16,23 +18,27 @@ var KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc // DefaultItemBasedRateLimiter returns a new rate limiter with base delay of 5 // seconds, max delay of 5 minutes. -func DefaultItemBasedRateLimiter() workqueue.RateLimiter { - return workqueue.NewItemExponentialFailureRateLimiter(time.Second*5, time.Minute*5) +func DefaultItemBasedRateLimiter() workqueue.TypedRateLimiter[types.NamespacedName] { + return workqueue.NewTypedItemExponentialFailureRateLimiter[types.NamespacedName](5*time.Second, 5*time.Minute) } // QueuingEventHandler is an implementation of cache.ResourceEventHandler that // simply queues objects that are added/updated/deleted. type QueuingEventHandler struct { - Queue workqueue.RateLimitingInterface + Queue workqueue.TypedRateLimitingInterface[types.NamespacedName] } // Enqueue adds a key for an object to the workqueue. func (q *QueuingEventHandler) Enqueue(obj interface{}) { - key, err := KeyFunc(obj) + accessor, err := meta.Accessor(obj) if err != nil { runtime.HandleError(err) return } + key := types.NamespacedName{ + Namespace: accessor.GetNamespace(), + Name: accessor.GetName(), + } q.Queue.Add(key) } diff --git a/internal/k8s/status_test.go b/internal/k8s/status_test.go index 3982186412..b7f6214052 100644 --- a/internal/k8s/status_test.go +++ b/internal/k8s/status_test.go @@ -6,6 +6,7 @@ import ( "log/slog" "reflect" "testing" + "time" "github.com/google/go-cmp/cmp" nic_glog "github.com/nginxinc/kubernetes-ingress/internal/logger/glog" @@ -94,16 +95,18 @@ func TestUpdateTransportServerStatusIgnoreNoChange(t *testing.T) { }, }) - tsLister, _ := cache.NewInformer( - cache.NewListWatchFromClient( - fakeClient.K8sV1().RESTClient(), - "transportservers", - "nginx-ingress", - fields.Everything(), - ), - &conf_v1.TransportServer{}, - 2, - nil, + tsLister, _ := cache.NewInformerWithOptions( + cache.InformerOptions{ + ListerWatcher: cache.NewListWatchFromClient( + fakeClient.K8sV1().RESTClient(), + "transportservers", + "nginx-ingress", + fields.Everything(), + ), + ObjectType: &conf_v1.TransportServer{}, + ResyncPeriod: 2 * time.Second, + Handler: nil, + }, ) err := tsLister.Add(ts) @@ -156,16 +159,18 @@ func TestUpdateTransportServerStatusMissingTransportServer(t *testing.T) { Items: []conf_v1.TransportServer{}, }) - tsLister, _ := cache.NewInformer( - cache.NewListWatchFromClient( - fakeClient.K8sV1().RESTClient(), - "transportservers", - "nginx-ingress", - fields.Everything(), - ), - &conf_v1.TransportServer{}, - 2, - nil, + tsLister, _ := cache.NewInformerWithOptions( + cache.InformerOptions{ + ListerWatcher: cache.NewListWatchFromClient( + fakeClient.K8sV1().RESTClient(), + "transportservers", + "nginx-ingress", + fields.Everything(), + ), + ObjectType: &conf_v1.TransportServer{}, + ResyncPeriod: 2 * time.Second, + Handler: nil, + }, ) nsi := make(map[string]*namespacedInformer) @@ -219,9 +224,20 @@ func TestStatusUpdateWithExternalStatusAndExternalService(t *testing.T) { }}, ) ingLister := storeToIngressLister{} - ingLister.Store, _ = cache.NewInformer( - cache.NewListWatchFromClient(fakeClient.NetworkingV1().RESTClient(), "ingresses", "nginx-ingress", fields.Everything()), - &networking.Ingress{}, 2, nil) + + ingLister.Store, _ = cache.NewInformerWithOptions( + cache.InformerOptions{ + ListerWatcher: cache.NewListWatchFromClient( + fakeClient.NetworkingV1().RESTClient(), + "ingresses", + "nginx-ingress", + fields.Everything(), + ), + ObjectType: &networking.Ingress{}, + ResyncPeriod: 2 * time.Second, + Handler: nil, + }, + ) err := ingLister.Store.Add(&ing) if err != nil { @@ -328,9 +344,20 @@ func TestStatusUpdateWithExternalStatusAndIngressLink(t *testing.T) { }}, ) ingLister := storeToIngressLister{} - ingLister.Store, _ = cache.NewInformer( - cache.NewListWatchFromClient(fakeClient.NetworkingV1().RESTClient(), "ingresses", "nginx-ingress", fields.Everything()), - &networking.Ingress{}, 2, nil) + + ingLister.Store, _ = cache.NewInformerWithOptions( + cache.InformerOptions{ + ListerWatcher: cache.NewListWatchFromClient( + fakeClient.NetworkingV1().RESTClient(), + "ingresses", + "nginx-ingress", + fields.Everything(), + ), + ObjectType: &networking.Ingress{}, + ResyncPeriod: 2 * time.Second, + Handler: nil, + }, + ) err := ingLister.Store.Add(&ing) if err != nil {