From 0e0bccfc6f907284314facf6e458638548a03b76 Mon Sep 17 00:00:00 2001 From: Matt Moore Date: Tue, 17 Mar 2020 10:20:48 -0700 Subject: [PATCH] Make Broker availability based on the Endpoints not Deployments. Today the fact that the Broker lifecycle stuff propagates *Deployment* status for filter/ingress leaks implementation details into the API types. An example of where this leakage is bas would be if I were to run a multi-tenant broker (#2760) as something other than a Deployment. This change corrects one small part of this leaky interface by shifting from Deployment to Endpoints as the canonical resource for establishing readiness. This is also slightly more correct as it assesses not just whether the Deployment is ready, but whether that information has propagated to the Service / Endpoints. --- .../core/roles/controller-clusterroles.yaml | 1 + pkg/apis/duck/lifecycle_helper.go | 11 ++ .../eventing/v1alpha1/broker_lifecycle.go | 18 +-- .../v1alpha1/broker_lifecycle_test.go | 17 ++- pkg/apis/eventing/v1alpha1/test_helper.go | 44 +++--- pkg/reconciler/broker/broker.go | 50 +++---- pkg/reconciler/broker/broker_test.go | 51 +++++-- pkg/reconciler/broker/controller.go | 8 ++ pkg/reconciler/broker/controller_test.go | 1 + pkg/reconciler/broker/resources/filter.go | 133 ++++++++---------- pkg/reconciler/broker/resources/ingress.go | 131 ++++++++--------- pkg/reconciler/broker/trigger.go | 7 +- pkg/reconciler/testing/broker.go | 8 +- pkg/reconciler/testing/endpoints.go | 67 +++++++++ 14 files changed, 316 insertions(+), 231 deletions(-) create mode 100644 pkg/reconciler/testing/endpoints.go diff --git a/config/core/roles/controller-clusterroles.yaml b/config/core/roles/controller-clusterroles.yaml index eb97da57e94..a8453b364e2 100644 --- a/config/core/roles/controller-clusterroles.yaml +++ b/config/core/roles/controller-clusterroles.yaml @@ -26,6 +26,7 @@ rules: - "secrets" - "configmaps" - "services" + - "endpoints" - "events" - "serviceaccounts" verbs: &everything diff --git a/pkg/apis/duck/lifecycle_helper.go b/pkg/apis/duck/lifecycle_helper.go index c1b06b2de35..98888bbb0e4 100644 --- a/pkg/apis/duck/lifecycle_helper.go +++ b/pkg/apis/duck/lifecycle_helper.go @@ -18,6 +18,7 @@ package duck import ( appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" ) // DeploymentIsAvailable determines if the provided deployment is available. Note that if it cannot @@ -31,3 +32,13 @@ func DeploymentIsAvailable(d *appsv1.DeploymentStatus, def bool) bool { } return def } + +// EndpointsAreAvailable determines if the provided Endpoints are available. +func EndpointsAreAvailable(ep *corev1.Endpoints) bool { + for _, subset := range ep.Subsets { + if len(subset.Addresses) > 0 { + return true + } + } + return false +} diff --git a/pkg/apis/eventing/v1alpha1/broker_lifecycle.go b/pkg/apis/eventing/v1alpha1/broker_lifecycle.go index 54e4c23d559..d99fabf8867 100644 --- a/pkg/apis/eventing/v1alpha1/broker_lifecycle.go +++ b/pkg/apis/eventing/v1alpha1/broker_lifecycle.go @@ -17,9 +17,9 @@ limitations under the License. package v1alpha1 import ( - appsv1 "k8s.io/api/apps/v1" "knative.dev/pkg/apis" + corev1 "k8s.io/api/core/v1" "knative.dev/eventing/pkg/apis/duck" duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" ) @@ -63,13 +63,11 @@ func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interfa brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...) } -func (bs *BrokerStatus) PropagateIngressDeploymentAvailability(d *appsv1.Deployment) { - if duck.DeploymentIsAvailable(&d.Status, true) { +func (bs *BrokerStatus) PropagateIngressAvailability(ep *corev1.Endpoints) { + if duck.EndpointsAreAvailable(ep) { brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngress) } else { - // I don't know how to propagate the status well, so just give the name of the Deployment - // for now. - bs.MarkIngressFailed("DeploymentUnavailable", "The Deployment '%s' is unavailable.", d.Name) + bs.MarkIngressFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) } } @@ -91,13 +89,11 @@ func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interfac brokerCondSet.Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...) } -func (bs *BrokerStatus) PropagateFilterDeploymentAvailability(d *appsv1.Deployment) { - if duck.DeploymentIsAvailable(&d.Status, true) { +func (bs *BrokerStatus) PropagateFilterAvailability(ep *corev1.Endpoints) { + if duck.EndpointsAreAvailable(ep) { brokerCondSet.Manage(bs).MarkTrue(BrokerConditionFilter) } else { - // I don't know how to propagate the status well, so just give the name of the Deployment - // for now. - bs.MarkFilterFailed("DeploymentUnavailable", "The Deployment '%s' is unavailable.", d.Name) + bs.MarkFilterFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) } } diff --git a/pkg/apis/eventing/v1alpha1/broker_lifecycle_test.go b/pkg/apis/eventing/v1alpha1/broker_lifecycle_test.go index 981c57b9213..cdc385d7247 100644 --- a/pkg/apis/eventing/v1alpha1/broker_lifecycle_test.go +++ b/pkg/apis/eventing/v1alpha1/broker_lifecycle_test.go @@ -23,7 +23,6 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/go-cmp/cmp" - v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" @@ -316,13 +315,13 @@ func TestBrokerIsReady(t *testing.T) { t.Run(testName, func(t *testing.T) { bs := &BrokerStatus{} if test.markIngressReady != nil { - var d *v1.Deployment + var ep *corev1.Endpoints if *test.markIngressReady { - d = TestHelper.AvailableDeployment() + ep = TestHelper.AvailableEndpoints() } else { - d = TestHelper.UnavailableDeployment() + ep = TestHelper.UnavailableEndpoints() } - bs.PropagateIngressDeploymentAvailability(d) + bs.PropagateIngressAvailability(ep) } if test.markTriggerChannelReady != nil { var c *duckv1alpha1.ChannelableStatus @@ -334,13 +333,13 @@ func TestBrokerIsReady(t *testing.T) { bs.PropagateTriggerChannelReadiness(c) } if test.markFilterReady != nil { - var d *v1.Deployment + var ep *corev1.Endpoints if *test.markFilterReady { - d = TestHelper.AvailableDeployment() + ep = TestHelper.AvailableEndpoints() } else { - d = TestHelper.UnavailableDeployment() + ep = TestHelper.UnavailableEndpoints() } - bs.PropagateFilterDeploymentAvailability(d) + bs.PropagateFilterAvailability(ep) } bs.SetAddress(test.address) diff --git a/pkg/apis/eventing/v1alpha1/test_helper.go b/pkg/apis/eventing/v1alpha1/test_helper.go index b395d52c562..93ea443eb1c 100644 --- a/pkg/apis/eventing/v1alpha1/test_helper.go +++ b/pkg/apis/eventing/v1alpha1/test_helper.go @@ -17,7 +17,7 @@ limitations under the License. package v1alpha1 import ( - v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "knative.dev/pkg/apis" pkgduckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1" @@ -68,9 +68,9 @@ func (testHelper) FalseSubscriptionStatus() *messagingv1alpha1.SubscriptionStatu func (t testHelper) ReadyBrokerStatus() *BrokerStatus { bs := &BrokerStatus{} - bs.PropagateIngressDeploymentAvailability(t.AvailableDeployment()) + bs.PropagateIngressAvailability(t.AvailableEndpoints()) bs.PropagateTriggerChannelReadiness(t.ReadyChannelStatus()) - bs.PropagateFilterDeploymentAvailability(t.AvailableDeployment()) + bs.PropagateFilterAvailability(t.AvailableEndpoints()) bs.SetAddress(&apis.URL{Scheme: "http", Host: "foo"}) return bs } @@ -99,26 +99,24 @@ func (t testHelper) ReadyTriggerStatus() *TriggerStatus { return ts } -func (testHelper) UnavailableDeployment() *v1.Deployment { - d := &v1.Deployment{} - d.Name = "unavailable" - d.Status.Conditions = []v1.DeploymentCondition{ - { - Type: v1.DeploymentAvailable, - Status: "False", - }, - } - return d +func (t testHelper) UnavailableEndpoints() *corev1.Endpoints { + ep := &corev1.Endpoints{} + ep.Name = "unavailable" + ep.Subsets = []corev1.EndpointSubset{{ + NotReadyAddresses: []corev1.EndpointAddress{{ + IP: "127.0.0.1", + }}, + }} + return ep } -func (t testHelper) AvailableDeployment() *v1.Deployment { - d := t.UnavailableDeployment() - d.Name = "available" - d.Status.Conditions = []v1.DeploymentCondition{ - { - Type: v1.DeploymentAvailable, - Status: "True", - }, - } - return d +func (t testHelper) AvailableEndpoints() *corev1.Endpoints { + ep := &corev1.Endpoints{} + ep.Name = "available" + ep.Subsets = []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{ + IP: "127.0.0.1", + }}, + }} + return ep } diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go index ab8a2f32fed..22b8cc1d0fd 100644 --- a/pkg/reconciler/broker/broker.go +++ b/pkg/reconciler/broker/broker.go @@ -34,6 +34,7 @@ import ( appsv1listers "k8s.io/client-go/listers/apps/v1" corev1listers "k8s.io/client-go/listers/core/v1" "knative.dev/pkg/apis" + "knative.dev/pkg/kmeta" duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" "knative.dev/eventing/pkg/apis/eventing" @@ -64,6 +65,7 @@ type Reconciler struct { // listers index properties about resources brokerLister eventinglisters.BrokerLister serviceLister corev1listers.ServiceLister + endpointsLister corev1listers.EndpointsLister deploymentLister appsv1listers.DeploymentLister subscriptionLister messaginglisters.SubscriptionLister triggerLister eventinglisters.TriggerLister @@ -124,7 +126,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *v1alpha1.Broker) pkgr return err } -func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) (*corev1.Service, pkgreconciler.Event) { +func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) (kmeta.Accessor, pkgreconciler.Event) { logging.FromContext(ctx).Debug("Reconciling", zap.Any("Broker", b)) b.Status.InitializeConditions() b.Status.ObservedGeneration = b.Generation @@ -176,42 +178,41 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) (*co b.Status.TriggerChannel = &chanMan.ref b.Status.PropagateTriggerChannelReadiness(&triggerChan.Status) - filterDeployment, err := r.reconcileFilterDeployment(ctx, b) - if err != nil { + if err := r.reconcileFilterDeployment(ctx, b); err != nil { logging.FromContext(ctx).Error("Problem reconciling filter Deployment", zap.Error(err)) b.Status.MarkFilterFailed("DeploymentFailure", "%v", err) return nil, err } - filterSvc, err := r.reconcileFilterService(ctx, b) + filterEndpoints, err := r.reconcileFilterService(ctx, b) if err != nil { logging.FromContext(ctx).Error("Problem reconciling filter Service", zap.Error(err)) b.Status.MarkFilterFailed("ServiceFailure", "%v", err) return nil, err } - b.Status.PropagateFilterDeploymentAvailability(filterDeployment) + b.Status.PropagateFilterAvailability(filterEndpoints) - ingressDeployment, err := r.reconcileIngressDeployment(ctx, b, triggerChan) - if err != nil { + if err := r.reconcileIngressDeployment(ctx, b, triggerChan); err != nil { logging.FromContext(ctx).Error("Problem reconciling ingress Deployment", zap.Error(err)) b.Status.MarkIngressFailed("DeploymentFailure", "%v", err) return nil, err } - svc, err := r.reconcileIngressService(ctx, b) + ingressEndpoints, err := r.reconcileIngressService(ctx, b) if err != nil { logging.FromContext(ctx).Error("Problem reconciling ingress Service", zap.Error(err)) b.Status.MarkIngressFailed("ServiceFailure", "%v", err) return nil, err } - b.Status.PropagateIngressDeploymentAvailability(ingressDeployment) + // TODO(mattmoor): Use Endpoints + b.Status.PropagateIngressAvailability(ingressEndpoints) b.Status.SetAddress(&apis.URL{ Scheme: "http", - Host: names.ServiceHostName(svc.Name, svc.Namespace), + Host: names.ServiceHostName(ingressEndpoints.GetName(), ingressEndpoints.GetNamespace()), }) // So, at this point the Broker is ready and everything should be solid // for the triggers to act upon. - return filterSvc, nil + return filterEndpoints, nil } type channelTemplate struct { @@ -292,7 +293,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, b *v1alpha1.Broker) pkgre } // reconcileFilterDeployment reconciles Broker's 'b' filter deployment. -func (r *Reconciler) reconcileFilterDeployment(ctx context.Context, b *v1alpha1.Broker) (*v1.Deployment, error) { +func (r *Reconciler) reconcileFilterDeployment(ctx context.Context, b *v1alpha1.Broker) error { expected := resources.MakeFilterDeployment(&resources.FilterArgs{ Broker: b, Image: r.filterImage, @@ -302,7 +303,7 @@ func (r *Reconciler) reconcileFilterDeployment(ctx context.Context, b *v1alpha1. } // reconcileFilterService reconciles Broker's 'b' filter service. -func (r *Reconciler) reconcileFilterService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Service, error) { +func (r *Reconciler) reconcileFilterService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Endpoints, error) { expected := resources.MakeFilterService(b) return r.reconcileService(ctx, expected) } @@ -356,16 +357,15 @@ func TriggerChannelLabels(brokerName string) map[string]string { } // reconcileDeployment reconciles the K8s Deployment 'd'. -func (r *Reconciler) reconcileDeployment(ctx context.Context, d *v1.Deployment) (*v1.Deployment, error) { +func (r *Reconciler) reconcileDeployment(ctx context.Context, d *v1.Deployment) error { current, err := r.deploymentLister.Deployments(d.Namespace).Get(d.Name) if apierrs.IsNotFound(err) { current, err = r.KubeClientSet.AppsV1().Deployments(d.Namespace).Create(d) if err != nil { - return nil, err + return err } - return current, nil } else if err != nil { - return nil, err + return err } if !equality.Semantic.DeepDerivative(d.Spec, current.Spec) { @@ -374,21 +374,20 @@ func (r *Reconciler) reconcileDeployment(ctx context.Context, d *v1.Deployment) desired.Spec = d.Spec current, err = r.KubeClientSet.AppsV1().Deployments(current.Namespace).Update(desired) if err != nil { - return nil, err + return err } } - return current, nil + return nil } // reconcileService reconciles the K8s Service 'svc'. -func (r *Reconciler) reconcileService(ctx context.Context, svc *corev1.Service) (*corev1.Service, error) { +func (r *Reconciler) reconcileService(ctx context.Context, svc *corev1.Service) (*corev1.Endpoints, error) { current, err := r.serviceLister.Services(svc.Namespace).Get(svc.Name) if apierrs.IsNotFound(err) { current, err = r.KubeClientSet.CoreV1().Services(svc.Namespace).Create(svc) if err != nil { return nil, err } - return current, nil } else if err != nil { return nil, err } @@ -405,11 +404,12 @@ func (r *Reconciler) reconcileService(ctx context.Context, svc *corev1.Service) return nil, err } } - return current, nil + + return r.endpointsLister.Endpoints(svc.Namespace).Get(svc.Name) } // reconcileIngressDeploymentCRD reconciles the Ingress Deployment for a CRD backed channel. -func (r *Reconciler) reconcileIngressDeployment(ctx context.Context, b *v1alpha1.Broker, c *duckv1alpha1.Channelable) (*v1.Deployment, error) { +func (r *Reconciler) reconcileIngressDeployment(ctx context.Context, b *v1alpha1.Broker, c *duckv1alpha1.Channelable) error { expected := resources.MakeIngressDeployment(&resources.IngressArgs{ Broker: b, Image: r.ingressImage, @@ -420,13 +420,13 @@ func (r *Reconciler) reconcileIngressDeployment(ctx context.Context, b *v1alpha1 } // reconcileIngressService reconciles the Ingress Service. -func (r *Reconciler) reconcileIngressService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Service, error) { +func (r *Reconciler) reconcileIngressService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Endpoints, error) { expected := resources.MakeIngressService(b) return r.reconcileService(ctx, expected) } // reconcileTriggers reconciles the Triggers that are pointed to this broker -func (r *Reconciler) reconcileTriggers(ctx context.Context, b *v1alpha1.Broker, filterSvc *corev1.Service) error { +func (r *Reconciler) reconcileTriggers(ctx context.Context, b *v1alpha1.Broker, filterSvc kmeta.Accessor) error { // TODO: Figure out the labels stuff... If webhook does it, we can filter like this... // Find all the Triggers that have been labeled as belonging to me diff --git a/pkg/reconciler/broker/broker_test.go b/pkg/reconciler/broker/broker_test.go index 0c144e5a22d..403e3feb4ae 100644 --- a/pkg/reconciler/broker/broker_test.go +++ b/pkg/reconciler/broker/broker_test.go @@ -483,6 +483,9 @@ func TestReconcile(t *testing.T) { WithServiceOwnerReferences(ownerReferences()), WithServiceLabels(resources.FilterLabels(brokerName)), WithServicePorts(servicePorts(8080))), + NewEndpoints(filterServiceName, testNS, + WithEndpointsLabels(resources.FilterLabels(brokerName)), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), }, WithReactors: []clientgotesting.ReactionFunc{ InduceFailure("create", "deployments"), @@ -501,7 +504,7 @@ func TestReconcile(t *testing.T) { WithBrokerChannel(channel()), WithInitBrokerConditions, WithTriggerChannelReady(), - WithFilterDeploymentAvailable(), + WithFilterAvailable(), WithBrokerTriggerChannel(createTriggerChannelRef()), WithIngressFailed("DeploymentFailure", "inducing failure for create deployments")), }}, @@ -533,6 +536,9 @@ func TestReconcile(t *testing.T) { WithServiceOwnerReferences(ownerReferences()), WithServiceLabels(resources.FilterLabels(brokerName)), WithServicePorts(servicePorts(8080))), + NewEndpoints(filterServiceName, testNS, + WithEndpointsLabels(resources.FilterLabels(brokerName)), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), NewDeployment(ingressDeploymentName, testNS, WithDeploymentOwnerReferences(ownerReferences()), WithDeploymentLabels(resources.IngressLabels(brokerName)), @@ -557,7 +563,7 @@ func TestReconcile(t *testing.T) { WithBrokerGeneration(brokerGeneration), WithBrokerStatusObservedGeneration(brokerGeneration), WithTriggerChannelReady(), - WithFilterDeploymentAvailable(), + WithFilterAvailable(), WithBrokerTriggerChannel(createTriggerChannelRef()), WithIngressFailed("DeploymentFailure", "inducing failure for update deployments")), }}, @@ -587,6 +593,9 @@ func TestReconcile(t *testing.T) { WithServiceOwnerReferences(ownerReferences()), WithServiceLabels(resources.FilterLabels(brokerName)), WithServicePorts(servicePorts(8080))), + NewEndpoints(filterServiceName, testNS, + WithEndpointsLabels(resources.FilterLabels(brokerName)), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), NewDeployment(ingressDeploymentName, testNS, WithDeploymentOwnerReferences(ownerReferences()), WithDeploymentLabels(resources.IngressLabels(brokerName)), @@ -608,7 +617,7 @@ func TestReconcile(t *testing.T) { WithBrokerChannel(channel()), WithInitBrokerConditions, WithTriggerChannelReady(), - WithFilterDeploymentAvailable(), + WithFilterAvailable(), WithBrokerTriggerChannel(createTriggerChannelRef()), WithIngressFailed("ServiceFailure", "inducing failure for create services")), }}, @@ -638,6 +647,9 @@ func TestReconcile(t *testing.T) { WithServiceOwnerReferences(ownerReferences()), WithServiceLabels(resources.FilterLabels(brokerName)), WithServicePorts(servicePorts(8080))), + NewEndpoints(filterServiceName, testNS, + WithEndpointsLabels(resources.FilterLabels(brokerName)), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), NewDeployment(ingressDeploymentName, testNS, WithDeploymentOwnerReferences(ownerReferences()), WithDeploymentLabels(resources.IngressLabels(brokerName)), @@ -663,7 +675,7 @@ func TestReconcile(t *testing.T) { WithBrokerChannel(channel()), WithInitBrokerConditions, WithTriggerChannelReady(), - WithFilterDeploymentAvailable(), + WithFilterAvailable(), WithBrokerTriggerChannel(createTriggerChannelRef()), WithIngressFailed("ServiceFailure", "inducing failure for update services")), }}, @@ -693,6 +705,9 @@ func TestReconcile(t *testing.T) { WithServiceOwnerReferences(ownerReferences()), WithServiceLabels(resources.FilterLabels(brokerName)), WithServicePorts(servicePorts(8080))), + NewEndpoints(filterServiceName, testNS, + WithEndpointsLabels(resources.FilterLabels(brokerName)), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), NewDeployment(ingressDeploymentName, testNS, WithDeploymentOwnerReferences(ownerReferences()), WithDeploymentLabels(resources.IngressLabels(brokerName)), @@ -702,6 +717,9 @@ func TestReconcile(t *testing.T) { WithServiceOwnerReferences(ownerReferences()), WithServiceLabels(resources.IngressLabels(brokerName)), WithServicePorts(servicePorts(8080))), + NewEndpoints(ingressServiceName, testNS, + WithEndpointsLabels(resources.IngressLabels(brokerName)), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewBroker(brokerName, testNS, @@ -718,8 +736,7 @@ func TestReconcile(t *testing.T) { WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, brokerName), }, - }, - { + }, { Name: "Successful Reconciliation, broker ignored because mismatching BrokerClass", Key: testKey, Objects: []runtime.Object{ @@ -728,8 +745,7 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions, WithBrokerClass("broker-class-mismatch")), }, - }, - { + }, { Name: "Successful Reconciliation, status update fails", Key: testKey, Objects: []runtime.Object{ @@ -747,6 +763,9 @@ func TestReconcile(t *testing.T) { WithServiceOwnerReferences(ownerReferences()), WithServiceLabels(resources.FilterLabels(brokerName)), WithServicePorts(servicePorts(8080))), + NewEndpoints(filterServiceName, testNS, + WithEndpointsLabels(resources.FilterLabels(brokerName)), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), NewDeployment(ingressDeploymentName, testNS, WithDeploymentOwnerReferences(ownerReferences()), WithDeploymentLabels(resources.IngressLabels(brokerName)), @@ -756,6 +775,9 @@ func TestReconcile(t *testing.T) { WithServiceOwnerReferences(ownerReferences()), WithServiceLabels(resources.IngressLabels(brokerName)), WithServicePorts(servicePorts(8080))), + NewEndpoints(ingressServiceName, testNS, + WithEndpointsLabels(resources.IngressLabels(brokerName)), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), }, WithReactors: []clientgotesting.ReactionFunc{ InduceFailure("update", "brokers"), @@ -795,6 +817,9 @@ func TestReconcile(t *testing.T) { WithServiceOwnerReferences(ownerReferences()), WithServiceLabels(resources.FilterLabels(brokerName)), WithServicePorts(servicePorts(8080))), + NewEndpoints(filterServiceName, testNS, + WithEndpointsLabels(resources.FilterLabels(brokerName)), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), NewDeployment(ingressDeploymentName, testNS, WithDeploymentOwnerReferences(ownerReferences()), WithDeploymentLabels(resources.IngressLabels(brokerName)), @@ -804,6 +829,9 @@ func TestReconcile(t *testing.T) { WithServiceOwnerReferences(ownerReferences()), WithServiceLabels(resources.IngressLabels(brokerName)), WithServicePorts(servicePorts(8080))), + NewEndpoints(ingressServiceName, testNS, + WithEndpointsLabels(resources.IngressLabels(brokerName)), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), NewTrigger(triggerName, testNS, brokerName, WithTriggerUID(triggerUID), WithTriggerSubscriberURI(subscriberURI)), @@ -1450,6 +1478,7 @@ func TestReconcile(t *testing.T) { triggerLister: listers.GetTriggerLister(), brokerLister: listers.GetBrokerLister(), serviceLister: listers.GetK8sServiceLister(), + endpointsLister: listers.GetEndpointsLister(), deploymentLister: listers.GetDeploymentLister(), filterImage: filterImage, filterServiceAccountName: filterSA, @@ -1793,6 +1822,9 @@ func allBrokerObjectsReadyPlus(objs ...runtime.Object) []runtime.Object { WithServiceOwnerReferences(ownerReferences()), WithServiceLabels(resources.FilterLabels(brokerName)), WithServicePorts(servicePorts(8080))), + NewEndpoints(filterServiceName, testNS, + WithEndpointsLabels(resources.FilterLabels(brokerName)), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), NewDeployment(ingressDeploymentName, testNS, WithDeploymentOwnerReferences(ownerReferences()), WithDeploymentLabels(resources.IngressLabels(brokerName)), @@ -1802,6 +1834,9 @@ func allBrokerObjectsReadyPlus(objs ...runtime.Object) []runtime.Object { WithServiceOwnerReferences(ownerReferences()), WithServiceLabels(resources.IngressLabels(brokerName)), WithServicePorts(servicePorts(8080))), + NewEndpoints(ingressServiceName, testNS, + WithEndpointsLabels(resources.IngressLabels(brokerName)), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), } return append(brokerObjs[:], objs...) } diff --git a/pkg/reconciler/broker/controller.go b/pkg/reconciler/broker/controller.go index eb7cbf219d9..e6306bc7e89 100644 --- a/pkg/reconciler/broker/controller.go +++ b/pkg/reconciler/broker/controller.go @@ -37,6 +37,7 @@ import ( "knative.dev/pkg/client/injection/ducks/duck/v1/addressable" "knative.dev/pkg/client/injection/ducks/duck/v1/conditions" deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" + endpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints" serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -78,11 +79,13 @@ func NewController( triggerInformer := triggerinformer.Get(ctx) subscriptionInformer := subscriptioninformer.Get(ctx) serviceInformer := serviceinformer.Get(ctx) + endpointsInformer := endpointsinformer.Get(ctx) r := &Reconciler{ Base: reconciler.NewBase(ctx, controllerAgentName, cmw), brokerLister: brokerInformer.Lister(), serviceLister: serviceInformer.Lister(), + endpointsLister: endpointsInformer.Lister(), deploymentLister: deploymentInformer.Lister(), subscriptionLister: subscriptionInformer.Lister(), triggerLister: triggerInformer.Lister(), @@ -112,6 +115,11 @@ func NewController( Handler: controller.HandleAll(impl.EnqueueControllerOf), }) + endpointsInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: pkgreconciler.LabelExistsFilterFunc(eventing.BrokerLabelKey), + Handler: controller.HandleAll(impl.EnqueueLabelOfNamespaceScopedResource("" /*any namespace*/, eventing.BrokerLabelKey)), + }) + deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterGroupKind(v1alpha1.Kind("Broker")), Handler: controller.HandleAll(impl.EnqueueControllerOf), diff --git a/pkg/reconciler/broker/controller_test.go b/pkg/reconciler/broker/controller_test.go index 6ac4f58dca8..8f1264985fa 100644 --- a/pkg/reconciler/broker/controller_test.go +++ b/pkg/reconciler/broker/controller_test.go @@ -30,6 +30,7 @@ import ( _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake" _ "knative.dev/pkg/client/injection/ducks/duck/v1/conditions/fake" _ "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/fake" + _ "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake" ) diff --git a/pkg/reconciler/broker/resources/filter.go b/pkg/reconciler/broker/resources/filter.go index f595377807b..d8f6f939b7c 100644 --- a/pkg/reconciler/broker/resources/filter.go +++ b/pkg/reconciler/broker/resources/filter.go @@ -62,77 +62,65 @@ func MakeFilterDeployment(args *FilterArgs) *appsv1.Deployment { }, Spec: corev1.PodSpec{ ServiceAccountName: args.ServiceAccountName, - Containers: []corev1.Container{ - { - Name: filterContainerName, - Image: args.Image, - LivenessProbe: &corev1.Probe{ - Handler: corev1.Handler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/healthz", - Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080}, - }, + Containers: []corev1.Container{{ + Name: filterContainerName, + Image: args.Image, + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/healthz", + Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080}, }, - InitialDelaySeconds: 5, - PeriodSeconds: 2, }, - ReadinessProbe: &corev1.Probe{ - Handler: corev1.Handler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/readyz", - Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080}, - }, + InitialDelaySeconds: 5, + PeriodSeconds: 2, + }, + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/readyz", + Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080}, }, - InitialDelaySeconds: 5, - PeriodSeconds: 2, }, - Env: []corev1.EnvVar{ - { - Name: system.NamespaceEnvKey, - Value: system.Namespace(), - }, - { - Name: "NAMESPACE", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, - }, - }, - { - Name: "POD_NAME", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.name", - }, - }, - }, - { - Name: "CONTAINER_NAME", - Value: filterContainerName, - }, - { - Name: "BROKER", - Value: args.Broker.Name, - }, - // Used for StackDriver only. - { - Name: "METRICS_DOMAIN", - Value: "knative.dev/internal/eventing", + InitialDelaySeconds: 5, + PeriodSeconds: 2, + }, + Env: []corev1.EnvVar{{ + Name: system.NamespaceEnvKey, + Value: system.Namespace(), + }, { + Name: "NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", }, }, - Ports: []corev1.ContainerPort{ - { - ContainerPort: 8080, - Name: "http", - }, - { - ContainerPort: 9090, - Name: "metrics", + }, { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.name", }, }, - }, - }, + }, { + Name: "CONTAINER_NAME", + Value: filterContainerName, + }, { + Name: "BROKER", + Value: args.Broker.Name, + }, { + // Used for StackDriver only. + Name: "METRICS_DOMAIN", + Value: "knative.dev/internal/eventing", + }}, + Ports: []corev1.ContainerPort{{ + ContainerPort: 8080, + Name: "http", + }, { + ContainerPort: 9090, + Name: "metrics", + }}, + }}, }, }, }, @@ -152,17 +140,14 @@ func MakeFilterService(b *eventingv1alpha1.Broker) *corev1.Service { }, Spec: corev1.ServiceSpec{ Selector: FilterLabels(b.Name), - Ports: []corev1.ServicePort{ - { - Name: "http", - Port: 80, - TargetPort: intstr.FromInt(8080), - }, - { - Name: "http-metrics", - Port: 9090, - }, - }, + Ports: []corev1.ServicePort{{ + Name: "http", + Port: 80, + TargetPort: intstr.FromInt(8080), + }, { + Name: "http-metrics", + Port: 9090, + }}, }, } } diff --git a/pkg/reconciler/broker/resources/ingress.go b/pkg/reconciler/broker/resources/ingress.go index 5914a94d35f..41e951e4550 100644 --- a/pkg/reconciler/broker/resources/ingress.go +++ b/pkg/reconciler/broker/resources/ingress.go @@ -63,75 +63,61 @@ func MakeIngressDeployment(args *IngressArgs) *appsv1.Deployment { }, Spec: corev1.PodSpec{ ServiceAccountName: args.ServiceAccountName, - Containers: []corev1.Container{ - { - Image: args.Image, - Name: ingressContainerName, - LivenessProbe: &corev1.Probe{ - Handler: corev1.Handler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/healthz", - Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080}, - }, + Containers: []corev1.Container{{ + Image: args.Image, + Name: ingressContainerName, + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/healthz", + Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080}, }, - InitialDelaySeconds: 5, - PeriodSeconds: 2, }, - Env: []corev1.EnvVar{ - { - Name: system.NamespaceEnvKey, - Value: system.Namespace(), - }, - { - Name: "NAMESPACE", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, - }, - }, - { - Name: "POD_NAME", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.name", - }, - }, - }, - { - Name: "CONTAINER_NAME", - Value: ingressContainerName, - }, - { - Name: "FILTER", - Value: "", // TODO Add one. - }, - { - Name: "CHANNEL", - Value: args.ChannelAddress, - }, - { - Name: "BROKER", - Value: args.Broker.Name, - }, - // Used for StackDriver only. - { - Name: "METRICS_DOMAIN", - Value: "knative.dev/internal/eventing", + InitialDelaySeconds: 5, + PeriodSeconds: 2, + }, + Env: []corev1.EnvVar{{ + Name: system.NamespaceEnvKey, + Value: system.Namespace(), + }, { + Name: "NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", }, }, - Ports: []corev1.ContainerPort{ - { - ContainerPort: 8080, - Name: "http", - }, - { - ContainerPort: 9090, - Name: "metrics", + }, { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.name", }, }, - }, - }, + }, { + Name: "CONTAINER_NAME", + Value: ingressContainerName, + }, { + Name: "FILTER", + Value: "", // TODO Add one. + }, { + Name: "CHANNEL", + Value: args.ChannelAddress, + }, { + Name: "BROKER", + Value: args.Broker.Name, + }, { + // Used for StackDriver only. + Name: "METRICS_DOMAIN", + Value: "knative.dev/internal/eventing", + }}, + Ports: []corev1.ContainerPort{{ + ContainerPort: 8080, + Name: "http", + }, { + ContainerPort: 9090, + Name: "metrics", + }}, + }}, }, }, }, @@ -152,17 +138,14 @@ func MakeIngressService(b *eventingv1alpha1.Broker) *corev1.Service { }, Spec: corev1.ServiceSpec{ Selector: IngressLabels(b.Name), - Ports: []corev1.ServicePort{ - { - Name: "http", - Port: 80, - TargetPort: intstr.FromInt(8080), - }, - { - Name: "http-metrics", - Port: 9090, - }, - }, + Ports: []corev1.ServicePort{{ + Name: "http", + Port: 80, + TargetPort: intstr.FromInt(8080), + }, { + Name: "http-metrics", + Port: 9090, + }}, }, } } diff --git a/pkg/reconciler/broker/trigger.go b/pkg/reconciler/broker/trigger.go index 1e10abe6e7e..5c3cd7892bd 100644 --- a/pkg/reconciler/broker/trigger.go +++ b/pkg/reconciler/broker/trigger.go @@ -37,6 +37,7 @@ import ( "knative.dev/eventing/pkg/reconciler/trigger/path" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/kmeta" ) const ( @@ -52,7 +53,7 @@ const ( triggerServiceFailed = "TriggerServiceFailed" ) -func (r *Reconciler) reconcileTrigger(ctx context.Context, b *v1alpha1.Broker, t *v1alpha1.Trigger, filterSvc *corev1.Service) error { +func (r *Reconciler) reconcileTrigger(ctx context.Context, b *v1alpha1.Broker, t *v1alpha1.Trigger, filterSvc kmeta.Accessor) error { t.Status.InitializeConditions() if t.DeletionTimestamp != nil { @@ -100,13 +101,13 @@ func (r *Reconciler) reconcileTrigger(ctx context.Context, b *v1alpha1.Broker, t } // subscribeToBrokerChannel subscribes service 'svc' to the Broker's channels. -func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *v1alpha1.Broker, t *v1alpha1.Trigger, brokerTrigger *corev1.ObjectReference, svc *corev1.Service) (*messagingv1alpha1.Subscription, error) { +func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *v1alpha1.Broker, t *v1alpha1.Trigger, brokerTrigger *corev1.ObjectReference, svc kmeta.Accessor) (*messagingv1alpha1.Subscription, error) { if svc == nil { return nil, fmt.Errorf("service for broker is nil") } uri := &apis.URL{ Scheme: "http", - Host: names.ServiceHostName(svc.Name, svc.Namespace), + Host: names.ServiceHostName(svc.GetName(), svc.GetNamespace()), Path: path.Generate(t), } // Note that we have to hard code the brokerGKV stuff as sometimes typemeta is not diff --git a/pkg/reconciler/testing/broker.go b/pkg/reconciler/testing/broker.go index 756ea512624..576618a68cf 100644 --- a/pkg/reconciler/testing/broker.go +++ b/pkg/reconciler/testing/broker.go @@ -132,15 +132,15 @@ func WithTriggerChannelReady() BrokerOption { } } -func WithFilterDeploymentAvailable() BrokerOption { +func WithFilterAvailable() BrokerOption { return func(b *v1alpha1.Broker) { - b.Status.PropagateFilterDeploymentAvailability(v1alpha1.TestHelper.AvailableDeployment()) + b.Status.PropagateFilterAvailability(v1alpha1.TestHelper.AvailableEndpoints()) } } -func WithIngressDeploymentAvailable() BrokerOption { +func WithIngressAvailable() BrokerOption { return func(b *v1alpha1.Broker) { - b.Status.PropagateIngressDeploymentAvailability(v1alpha1.TestHelper.AvailableDeployment()) + b.Status.PropagateIngressAvailability(v1alpha1.TestHelper.AvailableEndpoints()) } } diff --git a/pkg/reconciler/testing/endpoints.go b/pkg/reconciler/testing/endpoints.go new file mode 100644 index 00000000000..020d458419c --- /dev/null +++ b/pkg/reconciler/testing/endpoints.go @@ -0,0 +1,67 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// EndpointsOption enables further configuration of a Endpoints. +type EndpointsOption func(*corev1.Endpoints) + +// NewEndpoints creates a Endpoints with EndpointsOptions +func NewEndpoints(name, namespace string, so ...EndpointsOption) *corev1.Endpoints { + s := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + for _, opt := range so { + opt(s) + } + return s +} + +func WithEndpointsLabels(labels map[string]string) EndpointsOption { + return func(s *corev1.Endpoints) { + s.ObjectMeta.Labels = labels + } +} + +func WithEndpointsAddresses(addrs ...corev1.EndpointAddress) EndpointsOption { + return func(s *corev1.Endpoints) { + s.Subsets = []corev1.EndpointSubset{{ + Addresses: addrs, + }} + } +} + +func WithEndpointsNotReadyAddresses(addrs ...corev1.EndpointAddress) EndpointsOption { + return func(s *corev1.Endpoints) { + s.Subsets = []corev1.EndpointSubset{{ + NotReadyAddresses: addrs, + }} + } +} + +func WithEndpointsAnnotations(annotations map[string]string) EndpointsOption { + return func(s *corev1.Endpoints) { + s.ObjectMeta.Annotations = annotations + } +}