Skip to content

Commit

Permalink
use NewInformerWithOptions and TypedRateLimitingInterface
Browse files Browse the repository at this point in the history
  • Loading branch information
Jim Ryan committed Oct 14, 2024
1 parent d3f1246 commit 45ed5d4
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 50 deletions.
39 changes: 19 additions & 20 deletions internal/externaldns/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
extdnslisters "github.com/nginxinc/kubernetes-ingress/pkg/client/listers/externaldns/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
Expand All @@ -31,7 +32,7 @@ const (
type ExtDNSController struct {
sync SyncFn
ctx context.Context
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[types.NamespacedName]
recorder record.EventRecorder
client k8s_nginx.Interface
informerGroup map[string]*namespacedInformer
Expand Down Expand Up @@ -60,9 +61,14 @@ type ExtDNSOpts struct {
// NewController takes external dns config and return a new External DNS Controller.
func NewController(opts *ExtDNSOpts) *ExtDNSController {
ig := make(map[string]*namespacedInformer)

rateLimiter := workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName]()

queue := workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{Name: ControllerName})

c := &ExtDNSController{
ctx: opts.context,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
queue: queue,
informerGroup: ig,
recorder: opts.eventRecorder,
client: opts.client,
Expand Down Expand Up @@ -155,39 +161,31 @@ func (c *ExtDNSController) runWorker(ctx context.Context) {
l := nl.LoggerFromContext(ctx)
nl.Debugf(l, "processing items on the workqueue")
for {
obj, shutdown := c.queue.Get()
key, shutdown := c.queue.Get()
if shutdown {
break
}

func() {
defer c.queue.Done(obj)
key, ok := obj.(string)
if !ok {
return
}

defer c.queue.Done(key)
if err := c.processItem(ctx, key); err != nil {
nl.Debugf(l, "Re-queuing item due to error processing: %v", err)
c.queue.AddRateLimited(obj)
c.queue.AddRateLimited(key)
return
}
nl.Debugf(l, "finished processing work item")
c.queue.Forget(obj)
c.queue.Forget(key)
}()
}
}

func (c *ExtDNSController) processItem(ctx context.Context, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return err
}
func (c *ExtDNSController) processItem(ctx context.Context, key types.NamespacedName) error {
namespace := key.Namespace
name := key.Name
l := nl.LoggerFromContext(ctx)
var vs *conf_v1.VirtualServer
nsi := getNamespacedInformer(namespace, c.informerGroup)
vs, err = nsi.vsLister.VirtualServers(namespace).Get(name)
vs, err := nsi.vsLister.VirtualServers(namespace).Get(name)

// VS has been deleted
if apierrors.IsNotFound(err) {
Expand All @@ -201,7 +199,7 @@ func (c *ExtDNSController) processItem(ctx context.Context, key string) error {
return c.sync(ctx, vs)
}

func externalDNSHandler(queue workqueue.RateLimitingInterface) func(obj interface{}) {
func externalDNSHandler(queue workqueue.TypedRateLimitingInterface[types.NamespacedName]) func(obj interface{}) {
return func(obj interface{}) {
ep, ok := obj.(*extdns_v1.DNSEndpoint)
if !ok {
Expand All @@ -223,7 +221,8 @@ func externalDNSHandler(queue workqueue.RateLimitingInterface) func(obj interfac
return
}

queue.Add(ep.Namespace + "/" + ref.Name)
key := types.NamespacedName{Namespace: ep.Namespace, Name: ref.Name}
queue.Add(key)
}
}

Expand Down
14 changes: 10 additions & 4 deletions internal/externaldns/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"reflect"
"time"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand All @@ -16,23 +18,27 @@ var KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc

// DefaultItemBasedRateLimiter returns a new rate limiter with base delay of 5
// seconds, max delay of 5 minutes.
func DefaultItemBasedRateLimiter() workqueue.RateLimiter {
return workqueue.NewItemExponentialFailureRateLimiter(time.Second*5, time.Minute*5)
func DefaultItemBasedRateLimiter() workqueue.TypedRateLimiter[types.NamespacedName] {
return workqueue.NewTypedItemExponentialFailureRateLimiter[types.NamespacedName](5*time.Second, 5*time.Minute)
}

// QueuingEventHandler is an implementation of cache.ResourceEventHandler that
// simply queues objects that are added/updated/deleted.
type QueuingEventHandler struct {
Queue workqueue.RateLimitingInterface
Queue workqueue.TypedRateLimitingInterface[types.NamespacedName]
}

// Enqueue adds a key for an object to the workqueue.
func (q *QueuingEventHandler) Enqueue(obj interface{}) {
key, err := KeyFunc(obj)
accessor, err := meta.Accessor(obj)
if err != nil {
runtime.HandleError(err)
return
}
key := types.NamespacedName{
Namespace: accessor.GetNamespace(),
Name: accessor.GetName(),
}
q.Queue.Add(key)
}

Expand Down
79 changes: 53 additions & 26 deletions internal/k8s/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log/slog"
"reflect"
"testing"
"time"

"github.com/google/go-cmp/cmp"
nic_glog "github.com/nginxinc/kubernetes-ingress/internal/logger/glog"
Expand Down Expand Up @@ -94,16 +95,18 @@ func TestUpdateTransportServerStatusIgnoreNoChange(t *testing.T) {
},
})

tsLister, _ := cache.NewInformer(
cache.NewListWatchFromClient(
fakeClient.K8sV1().RESTClient(),
"transportservers",
"nginx-ingress",
fields.Everything(),
),
&conf_v1.TransportServer{},
2,
nil,
tsLister, _ := cache.NewInformerWithOptions(
cache.InformerOptions{
ListerWatcher: cache.NewListWatchFromClient(
fakeClient.K8sV1().RESTClient(),
"transportservers",
"nginx-ingress",
fields.Everything(),
),
ObjectType: &conf_v1.TransportServer{},
ResyncPeriod: 2 * time.Second,
Handler: nil,
},
)

err := tsLister.Add(ts)
Expand Down Expand Up @@ -156,16 +159,18 @@ func TestUpdateTransportServerStatusMissingTransportServer(t *testing.T) {
Items: []conf_v1.TransportServer{},
})

tsLister, _ := cache.NewInformer(
cache.NewListWatchFromClient(
fakeClient.K8sV1().RESTClient(),
"transportservers",
"nginx-ingress",
fields.Everything(),
),
&conf_v1.TransportServer{},
2,
nil,
tsLister, _ := cache.NewInformerWithOptions(
cache.InformerOptions{
ListerWatcher: cache.NewListWatchFromClient(
fakeClient.K8sV1().RESTClient(),
"transportservers",
"nginx-ingress",
fields.Everything(),
),
ObjectType: &conf_v1.TransportServer{},
ResyncPeriod: 2 * time.Second,
Handler: nil,
},
)

nsi := make(map[string]*namespacedInformer)
Expand Down Expand Up @@ -219,9 +224,20 @@ func TestStatusUpdateWithExternalStatusAndExternalService(t *testing.T) {
}},
)
ingLister := storeToIngressLister{}
ingLister.Store, _ = cache.NewInformer(
cache.NewListWatchFromClient(fakeClient.NetworkingV1().RESTClient(), "ingresses", "nginx-ingress", fields.Everything()),
&networking.Ingress{}, 2, nil)

ingLister.Store, _ = cache.NewInformerWithOptions(
cache.InformerOptions{
ListerWatcher: cache.NewListWatchFromClient(
fakeClient.NetworkingV1().RESTClient(),
"ingresses",
"nginx-ingress",
fields.Everything(),
),
ObjectType: &networking.Ingress{},
ResyncPeriod: 2 * time.Second,
Handler: nil,
},
)

err := ingLister.Store.Add(&ing)
if err != nil {
Expand Down Expand Up @@ -328,9 +344,20 @@ func TestStatusUpdateWithExternalStatusAndIngressLink(t *testing.T) {
}},
)
ingLister := storeToIngressLister{}
ingLister.Store, _ = cache.NewInformer(
cache.NewListWatchFromClient(fakeClient.NetworkingV1().RESTClient(), "ingresses", "nginx-ingress", fields.Everything()),
&networking.Ingress{}, 2, nil)

ingLister.Store, _ = cache.NewInformerWithOptions(
cache.InformerOptions{
ListerWatcher: cache.NewListWatchFromClient(
fakeClient.NetworkingV1().RESTClient(),
"ingresses",
"nginx-ingress",
fields.Everything(),
),
ObjectType: &networking.Ingress{},
ResyncPeriod: 2 * time.Second,
Handler: nil,
},
)

err := ingLister.Store.Add(&ing)
if err != nil {
Expand Down

0 comments on commit 45ed5d4

Please sign in to comment.