Skip to content

Commit

Permalink
Enqueue Triggers on Broker changes (#3966) (#3968)
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi authored Aug 31, 2020
1 parent 7be9b4e commit 4051c5d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 18 deletions.
27 changes: 27 additions & 0 deletions pkg/reconciler/mtbroker/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ package mttrigger
import (
"context"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
subscriptioninformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription"
brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker"
triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger"
"knative.dev/eventing/pkg/duck"
"knative.dev/pkg/client/injection/ducks/duck/v1/conditions"
Expand All @@ -33,6 +37,7 @@ import (
"knative.dev/pkg/controller"
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/pkg/logging"
pkgreconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/resolver"
)

Expand Down Expand Up @@ -66,6 +71,28 @@ func NewController(

triggerInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))

// Filter Brokers and enqueue associated Triggers
brokerFilter := pkgreconciler.AnnotationFilterFunc(brokerreconciler.ClassAnnotationKey, eventing.MTChannelBrokerClassValue, false /*allowUnset*/)
brokerInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: brokerFilter,
Handler: controller.HandleAll(func(obj interface{}) {

if broker, ok := obj.(*eventingv1.Broker); ok {

selector := labels.SelectorFromSet(map[string]string{eventing.BrokerLabelKey: broker.Name})
triggers, err := triggerInformer.Lister().Triggers(broker.Namespace).List(selector)
if err != nil {
logger.Warn("Failed to list triggers", zap.Any("broker", broker), zap.Error(err))
return
}

for _, trigger := range triggers {
impl.Enqueue(trigger)
}
}
}),
})

// Reconcile Trigger when my Subscription changes
subscriptionInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterControllerGK(eventingv1.Kind("Trigger")),
Expand Down
18 changes: 0 additions & 18 deletions pkg/reconciler/mtbroker/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) p
// Everything is cleaned up by the garbage collector.
return nil
}
// Start tracking the broker
r.trackBroker(ctx, t)

b, err := r.brokerLister.Brokers(t.Namespace).Get(t.Spec.Broker)
if err != nil {
Expand Down Expand Up @@ -152,22 +150,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) p
return nil
}

func (r *Reconciler) trackBroker(ctx context.Context, t *eventingv1.Trigger) error {
trackKResource := r.kresourceTracker.TrackInNamespace(t)
brokerObjRef := corev1.ObjectReference{
Kind: brokerGVK.Kind,
APIVersion: brokerGVK.GroupVersion().String(),
Name: t.Spec.Broker,
Namespace: t.Namespace,
}

if err := trackKResource(brokerObjRef); err != nil {
return fmt.Errorf("Failed to track broker %q : %s", t.Spec.Broker, err)
}
logging.FromContext(ctx).Infow("Tracking:", zap.Any("Broker", brokerObjRef))
return nil
}

// subscribeToBrokerChannel subscribes service 'svc' to the Broker's channels.
func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *eventingv1.Broker, t *eventingv1.Trigger, brokerTrigger *corev1.ObjectReference) (*messagingv1.Subscription, error) {
recorder := controller.GetEventRecorder(ctx)
Expand Down

0 comments on commit 4051c5d

Please sign in to comment.