Skip to content

Commit

Permalink
[release-1.2] only reconcile triggers that point to the MTChannelBrok…
Browse files Browse the repository at this point in the history
…er class (#6269)

* only reconcile triggers that point to the mtbroker class

* make sure to also filter on election change, via PromoteFilterFunc

Co-authored-by: Scott Nichols <n3wscott@chainguard.dev>
  • Loading branch information
knative-prow-robot and n3wscott authored Mar 16, 2022
1 parent 21d0654 commit dfc5fd8
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 24 deletions.
48 changes: 37 additions & 11 deletions pkg/reconciler/broker/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import (
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
apiseventing "knative.dev/eventing/pkg/apis/eventing"
eventing "knative.dev/eventing/pkg/apis/eventing/v1"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
subscriptioninformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription"
brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker"
triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger"
v1 "knative.dev/eventing/pkg/client/listers/eventing/v1"
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1"
"knative.dev/eventing/pkg/duck"
"knative.dev/pkg/client/injection/ducks/duck/v1/source"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
Expand Down Expand Up @@ -63,20 +63,27 @@ func NewController(
triggerLister: triggerLister,
configmapLister: configmapInformer.Lister(),
}
impl := triggerreconciler.NewImpl(ctx, r)
impl := triggerreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {
return controller.Options{
PromoteFilterFunc: filterTriggers(r.brokerLister),
}
})
r.impl = impl

r.sourceTracker = duck.NewListableTrackerFromTracker(ctx, source.Get, impl.Tracker)
r.uriResolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker)

triggerInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
triggerInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: filterTriggers(r.brokerLister),
Handler: controller.HandleAll(impl.Enqueue),
})

// Filter Brokers and enqueue associated Triggers
brokerFilter := pkgreconciler.AnnotationFilterFunc(brokerreconciler.ClassAnnotationKey, eventing.MTChannelBrokerClassValue, false /*allowUnset*/)
brokerFilter := pkgreconciler.AnnotationFilterFunc(brokerreconciler.ClassAnnotationKey, apiseventing.MTChannelBrokerClassValue, false /*allowUnset*/)
brokerInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: brokerFilter,
Handler: controller.HandleAll(func(obj interface{}) {
if broker, ok := obj.(*eventingv1.Broker); ok {
if broker, ok := obj.(*eventing.Broker); ok {
for _, t := range getTriggersForBroker(logger, triggerLister, broker) {
impl.Enqueue(t)
}
Expand All @@ -86,20 +93,39 @@ func NewController(

// Reconcile Trigger when my Subscription changes
subscriptionInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&eventingv1.Trigger{}),
FilterFunc: controller.FilterController(&eventing.Trigger{}),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

return impl
}

// filterTriggers returns a function that returns true if the resource passed
// is a trigger pointing to a MTChannelBroker.
func filterTriggers(lister eventinglisters.BrokerLister) func(interface{}) bool {
return func(obj interface{}) bool {
trigger, ok := obj.(*eventing.Trigger)
if !ok {
return false
}

b, err := lister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker)
if err != nil {
return false
}

value, ok := b.GetAnnotations()[apiseventing.BrokerClassKey]
return ok && value == apiseventing.MTChannelBrokerClassValue
}
}

// getTriggersForBroker makes sure the object passed in is a Broker, and gets all
// the Triggers belonging to it. As there is no way to return failures in the
// Informers EventHandler, errors are logged, and an empty array is returned in case
// of failures.
func getTriggersForBroker(logger *zap.SugaredLogger, triggerLister v1.TriggerLister, broker *eventingv1.Broker) []*eventingv1.Trigger {
r := make([]*eventingv1.Trigger, 0)
selector := labels.SelectorFromSet(map[string]string{eventing.BrokerLabelKey: broker.Name})
func getTriggersForBroker(logger *zap.SugaredLogger, triggerLister eventinglisters.TriggerLister, broker *eventing.Broker) []*eventing.Trigger {
r := make([]*eventing.Trigger, 0)
selector := labels.SelectorFromSet(map[string]string{apiseventing.BrokerLabelKey: broker.Name})
triggers, err := triggerLister.Triggers(broker.Namespace).List(selector)
if err != nil {
logger.Warn("Failed to list triggers", zap.Any("broker", broker), zap.Error(err))
Expand Down
95 changes: 88 additions & 7 deletions pkg/reconciler/broker/trigger/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@ import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
v1 "knative.dev/eventing/pkg/apis/eventing/v1"
v1lister "knative.dev/eventing/pkg/client/listers/eventing/v1"

"k8s.io/apimachinery/pkg/runtime"

apiseventing "knative.dev/eventing/pkg/apis/eventing"
eventing "knative.dev/eventing/pkg/apis/eventing/v1"
brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
v1lister "knative.dev/eventing/pkg/client/listers/eventing/v1"
testingv1 "knative.dev/eventing/pkg/reconciler/testing/v1"
"knative.dev/pkg/configmap"
logtesting "knative.dev/pkg/logging/testing"

. "knative.dev/pkg/reconciler/testing"

// Fake injection informers
Expand All @@ -49,6 +52,84 @@ func TestNew(t *testing.T) {
}
}

func TestFilterTriggers(t *testing.T) {
ctx, _ := SetupFakeContext(t)

tt := []struct {
name string
trigger interface{}
pass bool
brokers []*eventing.Broker
}{{
name: "unknown type",
trigger: &eventing.Broker{},
pass: false,
}, {
name: "non matching broker",
trigger: &eventing.Trigger{
Spec: eventing.TriggerSpec{
Broker: "does-not-exists",
},
},
pass: false,
}, {
name: "exiting matching broker",
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
Annotations: map[string]string{
eventing.BrokerClassAnnotationKey: apiseventing.MTChannelBrokerClassValue,
},
},
}},
pass: true,
}, {
name: "exiting non matching broker",
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
Annotations: map[string]string{
eventing.BrokerClassAnnotationKey: "some-other-broker",
},
},
}},
pass: false,
}}

for _, tc := range tt {
tc := tc
t.Run(tc.name, func(t *testing.T) {
brokerInformer := brokerinformer.Get(ctx)
for _, obj := range tc.brokers {
_ = brokerInformer.Informer().GetStore().Add(obj)
}
filter := filterTriggers(brokerInformer.Lister())
pass := filter(tc.trigger)
assert.Equal(t, tc.pass, pass)
})
}
}

func TestGetTriggersForBroker(t *testing.T) {
for _, tt := range []struct {
name string
Expand Down Expand Up @@ -91,7 +172,7 @@ func TestGetTriggersForBroker(t *testing.T) {

type TriggerListerFailer struct{}

func (failer *TriggerListerFailer) List(selector labels.Selector) (ret []*v1.Trigger, err error) {
func (failer *TriggerListerFailer) List(selector labels.Selector) (ret []*eventing.Trigger, err error) {
return nil, nil
}

Expand All @@ -103,12 +184,12 @@ type TriggerNamespaceListerFailer struct{}

// List lists all Triggers in the indexer.
// Objects returned here must be treated as read-only.
func (failer *TriggerNamespaceListerFailer) List(selector labels.Selector) (ret []*v1.Trigger, err error) {
func (failer *TriggerNamespaceListerFailer) List(selector labels.Selector) (ret []*eventing.Trigger, err error) {
return nil, fmt.Errorf("Inducing test failure for List")
}

// Triggers returns an object that can list and get Triggers.
func (failer *TriggerNamespaceListerFailer) Get(name string) (*v1.Trigger, error) {
func (failer *TriggerNamespaceListerFailer) Get(name string) (*eventing.Trigger, error) {
return nil, nil
}

Expand Down
6 changes: 0 additions & 6 deletions pkg/reconciler/broker/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,6 @@ type Reconciler struct {

func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) pkgreconciler.Event {
logging.FromContext(ctx).Infow("Reconciling", zap.Any("Trigger", t))
t.Status.InitializeConditions()

if t.DeletionTimestamp != nil {
// Everything is cleaned up by the garbage collector.
return nil
}

b, err := r.brokerLister.Brokers(t.Namespace).Get(t.Spec.Broker)
if err != nil {
Expand Down

0 comments on commit dfc5fd8

Please sign in to comment.