Skip to content

Commit

Permalink
Adjust the Endpoints Informer events. (#2779)
Browse files Browse the repository at this point in the history
There was a superfluous Endpoints event registered, which is leftover from the single-tenant Broker.

There was a missing global resync due to changes in the shared Endpoints, which prevents Brokers from becoming ready if they are reconciled before the shared deployment have become ready.
  • Loading branch information
mattmoor authored Mar 19, 2020
1 parent f356127 commit 12f2beb
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
7 changes: 5 additions & 2 deletions pkg/reconciler/mtbroker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ const (
// Name of the corev1.Events emitted from the Broker reconciliation process.
brokerReconcileError = "BrokerReconcileError"
brokerReconciled = "BrokerReconciled"

BrokerFilterName = "broker-filter"
BrokerIngressName = "broker-ingress"
)

type Reconciler struct {
Expand Down Expand Up @@ -162,15 +165,15 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) pkgr
b.Status.TriggerChannel = &chanMan.ref
b.Status.PropagateTriggerChannelReadiness(&triggerChan.Status)

filterEndpoints, err := r.endpointsLister.Endpoints(system.Namespace()).Get("broker-filter")
filterEndpoints, err := r.endpointsLister.Endpoints(system.Namespace()).Get(BrokerFilterName)
if err != nil {
logging.FromContext(ctx).Error("Problem getting endpoints for filter", zap.String("namespace", system.Namespace()), zap.Error(err))
b.Status.MarkFilterFailed("ServiceFailure", "%v", err)
return err
}
b.Status.PropagateFilterAvailability(filterEndpoints)

ingressEndpoints, err := r.endpointsLister.Endpoints(system.Namespace()).Get("broker-ingress")
ingressEndpoints, err := r.endpointsLister.Endpoints(system.Namespace()).Get(BrokerIngressName)
if err != nil {
logging.FromContext(ctx).Error("Problem getting endpoints for ingress", zap.String("namespace", system.Namespace()), zap.Error(err))
b.Status.MarkIngressFailed("ServiceFailure", "%v", err)
Expand Down
33 changes: 27 additions & 6 deletions pkg/reconciler/mtbroker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"knative.dev/pkg/controller"
pkgreconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/resolver"
"knative.dev/pkg/system"
)

const (
Expand Down Expand Up @@ -77,16 +78,12 @@ func NewController(
r.addressableTracker = duck.NewListableTracker(ctx, addressable.Get, impl.EnqueueKey, controller.GetTrackerLease(ctx))
r.uriResolver = resolver.NewURIResolver(ctx, impl.EnqueueKey)

brokerFilter := pkgreconciler.AnnotationFilterFunc(brokerreconciler.ClassAnnotationKey, eventing.MTChannelBrokerClassValue, false /*allowUnset*/)
brokerInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: pkgreconciler.AnnotationFilterFunc(brokerreconciler.ClassAnnotationKey, eventing.MTChannelBrokerClassValue, false /*allowUnset*/),
FilterFunc: brokerFilter,
Handler: controller.HandleAll(impl.Enqueue),
})

endpointsInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: pkgreconciler.LabelExistsFilterFunc(eventing.BrokerLabelKey),
Handler: controller.HandleAll(impl.EnqueueLabelOfNamespaceScopedResource("" /*any namespace*/, eventing.BrokerLabelKey)),
})

// Reconcile Broker (which transitively reconciles the triggers), when Subscriptions
// that I own are changed.
subscriptionInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
Expand All @@ -109,5 +106,29 @@ func NewController(
},
))

// When the endpoints in our multi-tenant filter/ingress change, do a global resync.
// During installation, we might reconcile Brokers before our shared filter/ingress is
// ready, so when these endpoints change perform a global resync.
grCb := func(obj interface{}) {
// Since changes in the Filter/Ingress Service endpoints affect all the Broker objects,
// do a global resync.
r.Logger.Info("Doing a global resync due to endpoint changes in shared broker component")
impl.FilteredGlobalResync(brokerFilter, brokerInformer.Informer())
}
// Resync for the filter.
endpointsInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: pkgreconciler.ChainFilterFuncs(
pkgreconciler.NamespaceFilterFunc(system.Namespace()),
pkgreconciler.NameFilterFunc(BrokerFilterName)),
Handler: controller.HandleAll(grCb),
})
// Resync for the ingress.
endpointsInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: pkgreconciler.ChainFilterFuncs(
pkgreconciler.NamespaceFilterFunc(system.Namespace()),
pkgreconciler.NameFilterFunc(BrokerIngressName)),
Handler: controller.HandleAll(grCb),
})

return impl
}

0 comments on commit 12f2beb

Please sign in to comment.