From 4051c5d96eda2d073642f3d509a17846e1f44f1e Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Mon, 31 Aug 2020 14:02:08 +0200 Subject: [PATCH] Enqueue Triggers on Broker changes (#3966) (#3968) Signed-off-by: Pierangelo Di Pilato --- pkg/reconciler/mtbroker/trigger/controller.go | 27 +++++++++++++++++++ pkg/reconciler/mtbroker/trigger/trigger.go | 18 ------------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/pkg/reconciler/mtbroker/trigger/controller.go b/pkg/reconciler/mtbroker/trigger/controller.go index 3ed2cfd8503..c211aa7baa2 100644 --- a/pkg/reconciler/mtbroker/trigger/controller.go +++ b/pkg/reconciler/mtbroker/trigger/controller.go @@ -19,12 +19,16 @@ package mttrigger import ( "context" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" + "knative.dev/eventing/pkg/apis/eventing" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" eventingclient "knative.dev/eventing/pkg/client/injection/client" brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker" triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger" subscriptioninformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription" + brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker" triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger" "knative.dev/eventing/pkg/duck" "knative.dev/pkg/client/injection/ducks/duck/v1/conditions" @@ -33,6 +37,7 @@ import ( "knative.dev/pkg/controller" "knative.dev/pkg/injection/clients/dynamicclient" "knative.dev/pkg/logging" + pkgreconciler "knative.dev/pkg/reconciler" "knative.dev/pkg/resolver" ) @@ -66,6 +71,28 @@ func NewController( triggerInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) + // Filter Brokers and enqueue associated Triggers + brokerFilter := pkgreconciler.AnnotationFilterFunc(brokerreconciler.ClassAnnotationKey, eventing.MTChannelBrokerClassValue, false /*allowUnset*/) + brokerInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: brokerFilter, + Handler: controller.HandleAll(func(obj interface{}) { + + if broker, ok := obj.(*eventingv1.Broker); ok { + + selector := labels.SelectorFromSet(map[string]string{eventing.BrokerLabelKey: broker.Name}) + triggers, err := triggerInformer.Lister().Triggers(broker.Namespace).List(selector) + if err != nil { + logger.Warn("Failed to list triggers", zap.Any("broker", broker), zap.Error(err)) + return + } + + for _, trigger := range triggers { + impl.Enqueue(trigger) + } + } + }), + }) + // Reconcile Trigger when my Subscription changes subscriptionInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterControllerGK(eventingv1.Kind("Trigger")), diff --git a/pkg/reconciler/mtbroker/trigger/trigger.go b/pkg/reconciler/mtbroker/trigger/trigger.go index 956a4f3d438..b677c8e89dc 100644 --- a/pkg/reconciler/mtbroker/trigger/trigger.go +++ b/pkg/reconciler/mtbroker/trigger/trigger.go @@ -87,8 +87,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) p // Everything is cleaned up by the garbage collector. return nil } - // Start tracking the broker - r.trackBroker(ctx, t) b, err := r.brokerLister.Brokers(t.Namespace).Get(t.Spec.Broker) if err != nil { @@ -152,22 +150,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) p return nil } -func (r *Reconciler) trackBroker(ctx context.Context, t *eventingv1.Trigger) error { - trackKResource := r.kresourceTracker.TrackInNamespace(t) - brokerObjRef := corev1.ObjectReference{ - Kind: brokerGVK.Kind, - APIVersion: brokerGVK.GroupVersion().String(), - Name: t.Spec.Broker, - Namespace: t.Namespace, - } - - if err := trackKResource(brokerObjRef); err != nil { - return fmt.Errorf("Failed to track broker %q : %s", t.Spec.Broker, err) - } - logging.FromContext(ctx).Infow("Tracking:", zap.Any("Broker", brokerObjRef)) - return nil -} - // subscribeToBrokerChannel subscribes service 'svc' to the Broker's channels. func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *eventingv1.Broker, t *eventingv1.Trigger, brokerTrigger *corev1.ObjectReference) (*messagingv1.Subscription, error) { recorder := controller.GetEventRecorder(ctx)