Skip to content

Commit

Permalink
Make Broker availability based on the Endpoints not Deployments.
Browse files Browse the repository at this point in the history
Today the fact that the Broker lifecycle stuff propagates *Deployment* status for filter/ingress leaks implementation details into the API types.  An example of where this leakage is bas would be if I were to run a multi-tenant broker (knative#2760) as something other than a Deployment.

This change corrects one small part of this leaky interface by shifting from Deployment to Endpoints as the canonical resource for establishing readiness.

This is also slightly more correct as it assesses not just whether the Deployment is ready, but whether that information has propagated to the Service / Endpoints.
  • Loading branch information
mattmoor committed Mar 17, 2020
1 parent 0c409f8 commit 0e0bccf
Show file tree
Hide file tree
Showing 14 changed files with 316 additions and 231 deletions.
1 change: 1 addition & 0 deletions config/core/roles/controller-clusterroles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ rules:
- "secrets"
- "configmaps"
- "services"
- "endpoints"
- "events"
- "serviceaccounts"
verbs: &everything
Expand Down
11 changes: 11 additions & 0 deletions pkg/apis/duck/lifecycle_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package duck

import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)

// DeploymentIsAvailable determines if the provided deployment is available. Note that if it cannot
Expand All @@ -31,3 +32,13 @@ func DeploymentIsAvailable(d *appsv1.DeploymentStatus, def bool) bool {
}
return def
}

// EndpointsAreAvailable determines if the provided Endpoints are available.
func EndpointsAreAvailable(ep *corev1.Endpoints) bool {
for _, subset := range ep.Subsets {
if len(subset.Addresses) > 0 {
return true
}
}
return false
}
18 changes: 7 additions & 11 deletions pkg/apis/eventing/v1alpha1/broker_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ limitations under the License.
package v1alpha1

import (
appsv1 "k8s.io/api/apps/v1"
"knative.dev/pkg/apis"

corev1 "k8s.io/api/core/v1"
"knative.dev/eventing/pkg/apis/duck"
duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
)
Expand Down Expand Up @@ -63,13 +63,11 @@ func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interfa
brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...)
}

func (bs *BrokerStatus) PropagateIngressDeploymentAvailability(d *appsv1.Deployment) {
if duck.DeploymentIsAvailable(&d.Status, true) {
func (bs *BrokerStatus) PropagateIngressAvailability(ep *corev1.Endpoints) {
if duck.EndpointsAreAvailable(ep) {
brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngress)
} else {
// I don't know how to propagate the status well, so just give the name of the Deployment
// for now.
bs.MarkIngressFailed("DeploymentUnavailable", "The Deployment '%s' is unavailable.", d.Name)
bs.MarkIngressFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name)
}
}

Expand All @@ -91,13 +89,11 @@ func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interfac
brokerCondSet.Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...)
}

func (bs *BrokerStatus) PropagateFilterDeploymentAvailability(d *appsv1.Deployment) {
if duck.DeploymentIsAvailable(&d.Status, true) {
func (bs *BrokerStatus) PropagateFilterAvailability(ep *corev1.Endpoints) {
if duck.EndpointsAreAvailable(ep) {
brokerCondSet.Manage(bs).MarkTrue(BrokerConditionFilter)
} else {
// I don't know how to propagate the status well, so just give the name of the Deployment
// for now.
bs.MarkFilterFailed("DeploymentUnavailable", "The Deployment '%s' is unavailable.", d.Name)
bs.MarkFilterFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name)
}
}

Expand Down
17 changes: 8 additions & 9 deletions pkg/apis/eventing/v1alpha1/broker_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"

"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"

duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
Expand Down Expand Up @@ -316,13 +315,13 @@ func TestBrokerIsReady(t *testing.T) {
t.Run(testName, func(t *testing.T) {
bs := &BrokerStatus{}
if test.markIngressReady != nil {
var d *v1.Deployment
var ep *corev1.Endpoints
if *test.markIngressReady {
d = TestHelper.AvailableDeployment()
ep = TestHelper.AvailableEndpoints()
} else {
d = TestHelper.UnavailableDeployment()
ep = TestHelper.UnavailableEndpoints()
}
bs.PropagateIngressDeploymentAvailability(d)
bs.PropagateIngressAvailability(ep)
}
if test.markTriggerChannelReady != nil {
var c *duckv1alpha1.ChannelableStatus
Expand All @@ -334,13 +333,13 @@ func TestBrokerIsReady(t *testing.T) {
bs.PropagateTriggerChannelReadiness(c)
}
if test.markFilterReady != nil {
var d *v1.Deployment
var ep *corev1.Endpoints
if *test.markFilterReady {
d = TestHelper.AvailableDeployment()
ep = TestHelper.AvailableEndpoints()
} else {
d = TestHelper.UnavailableDeployment()
ep = TestHelper.UnavailableEndpoints()
}
bs.PropagateFilterDeploymentAvailability(d)
bs.PropagateFilterAvailability(ep)
}
bs.SetAddress(test.address)

Expand Down
44 changes: 21 additions & 23 deletions pkg/apis/eventing/v1alpha1/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package v1alpha1

import (
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/apis"
pkgduckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1"

Expand Down Expand Up @@ -68,9 +68,9 @@ func (testHelper) FalseSubscriptionStatus() *messagingv1alpha1.SubscriptionStatu

func (t testHelper) ReadyBrokerStatus() *BrokerStatus {
bs := &BrokerStatus{}
bs.PropagateIngressDeploymentAvailability(t.AvailableDeployment())
bs.PropagateIngressAvailability(t.AvailableEndpoints())
bs.PropagateTriggerChannelReadiness(t.ReadyChannelStatus())
bs.PropagateFilterDeploymentAvailability(t.AvailableDeployment())
bs.PropagateFilterAvailability(t.AvailableEndpoints())
bs.SetAddress(&apis.URL{Scheme: "http", Host: "foo"})
return bs
}
Expand Down Expand Up @@ -99,26 +99,24 @@ func (t testHelper) ReadyTriggerStatus() *TriggerStatus {
return ts
}

func (testHelper) UnavailableDeployment() *v1.Deployment {
d := &v1.Deployment{}
d.Name = "unavailable"
d.Status.Conditions = []v1.DeploymentCondition{
{
Type: v1.DeploymentAvailable,
Status: "False",
},
}
return d
func (t testHelper) UnavailableEndpoints() *corev1.Endpoints {
ep := &corev1.Endpoints{}
ep.Name = "unavailable"
ep.Subsets = []corev1.EndpointSubset{{
NotReadyAddresses: []corev1.EndpointAddress{{
IP: "127.0.0.1",
}},
}}
return ep
}

func (t testHelper) AvailableDeployment() *v1.Deployment {
d := t.UnavailableDeployment()
d.Name = "available"
d.Status.Conditions = []v1.DeploymentCondition{
{
Type: v1.DeploymentAvailable,
Status: "True",
},
}
return d
func (t testHelper) AvailableEndpoints() *corev1.Endpoints {
ep := &corev1.Endpoints{}
ep.Name = "available"
ep.Subsets = []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{
IP: "127.0.0.1",
}},
}}
return ep
}
50 changes: 25 additions & 25 deletions pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
appsv1listers "k8s.io/client-go/listers/apps/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"knative.dev/pkg/apis"
"knative.dev/pkg/kmeta"

duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
"knative.dev/eventing/pkg/apis/eventing"
Expand Down Expand Up @@ -64,6 +65,7 @@ type Reconciler struct {
// listers index properties about resources
brokerLister eventinglisters.BrokerLister
serviceLister corev1listers.ServiceLister
endpointsLister corev1listers.EndpointsLister
deploymentLister appsv1listers.DeploymentLister
subscriptionLister messaginglisters.SubscriptionLister
triggerLister eventinglisters.TriggerLister
Expand Down Expand Up @@ -124,7 +126,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *v1alpha1.Broker) pkgr
return err
}

func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) (*corev1.Service, pkgreconciler.Event) {
func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) (kmeta.Accessor, pkgreconciler.Event) {
logging.FromContext(ctx).Debug("Reconciling", zap.Any("Broker", b))
b.Status.InitializeConditions()
b.Status.ObservedGeneration = b.Generation
Expand Down Expand Up @@ -176,42 +178,41 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) (*co
b.Status.TriggerChannel = &chanMan.ref
b.Status.PropagateTriggerChannelReadiness(&triggerChan.Status)

filterDeployment, err := r.reconcileFilterDeployment(ctx, b)
if err != nil {
if err := r.reconcileFilterDeployment(ctx, b); err != nil {
logging.FromContext(ctx).Error("Problem reconciling filter Deployment", zap.Error(err))
b.Status.MarkFilterFailed("DeploymentFailure", "%v", err)
return nil, err
}
filterSvc, err := r.reconcileFilterService(ctx, b)
filterEndpoints, err := r.reconcileFilterService(ctx, b)
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling filter Service", zap.Error(err))
b.Status.MarkFilterFailed("ServiceFailure", "%v", err)
return nil, err
}
b.Status.PropagateFilterDeploymentAvailability(filterDeployment)
b.Status.PropagateFilterAvailability(filterEndpoints)

ingressDeployment, err := r.reconcileIngressDeployment(ctx, b, triggerChan)
if err != nil {
if err := r.reconcileIngressDeployment(ctx, b, triggerChan); err != nil {
logging.FromContext(ctx).Error("Problem reconciling ingress Deployment", zap.Error(err))
b.Status.MarkIngressFailed("DeploymentFailure", "%v", err)
return nil, err
}

svc, err := r.reconcileIngressService(ctx, b)
ingressEndpoints, err := r.reconcileIngressService(ctx, b)
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling ingress Service", zap.Error(err))
b.Status.MarkIngressFailed("ServiceFailure", "%v", err)
return nil, err
}
b.Status.PropagateIngressDeploymentAvailability(ingressDeployment)
// TODO(mattmoor): Use Endpoints
b.Status.PropagateIngressAvailability(ingressEndpoints)
b.Status.SetAddress(&apis.URL{
Scheme: "http",
Host: names.ServiceHostName(svc.Name, svc.Namespace),
Host: names.ServiceHostName(ingressEndpoints.GetName(), ingressEndpoints.GetNamespace()),
})

// So, at this point the Broker is ready and everything should be solid
// for the triggers to act upon.
return filterSvc, nil
return filterEndpoints, nil
}

type channelTemplate struct {
Expand Down Expand Up @@ -292,7 +293,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, b *v1alpha1.Broker) pkgre
}

// reconcileFilterDeployment reconciles Broker's 'b' filter deployment.
func (r *Reconciler) reconcileFilterDeployment(ctx context.Context, b *v1alpha1.Broker) (*v1.Deployment, error) {
func (r *Reconciler) reconcileFilterDeployment(ctx context.Context, b *v1alpha1.Broker) error {
expected := resources.MakeFilterDeployment(&resources.FilterArgs{
Broker: b,
Image: r.filterImage,
Expand All @@ -302,7 +303,7 @@ func (r *Reconciler) reconcileFilterDeployment(ctx context.Context, b *v1alpha1.
}

// reconcileFilterService reconciles Broker's 'b' filter service.
func (r *Reconciler) reconcileFilterService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Service, error) {
func (r *Reconciler) reconcileFilterService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Endpoints, error) {
expected := resources.MakeFilterService(b)
return r.reconcileService(ctx, expected)
}
Expand Down Expand Up @@ -356,16 +357,15 @@ func TriggerChannelLabels(brokerName string) map[string]string {
}

// reconcileDeployment reconciles the K8s Deployment 'd'.
func (r *Reconciler) reconcileDeployment(ctx context.Context, d *v1.Deployment) (*v1.Deployment, error) {
func (r *Reconciler) reconcileDeployment(ctx context.Context, d *v1.Deployment) error {
current, err := r.deploymentLister.Deployments(d.Namespace).Get(d.Name)
if apierrs.IsNotFound(err) {
current, err = r.KubeClientSet.AppsV1().Deployments(d.Namespace).Create(d)
if err != nil {
return nil, err
return err
}
return current, nil
} else if err != nil {
return nil, err
return err
}

if !equality.Semantic.DeepDerivative(d.Spec, current.Spec) {
Expand All @@ -374,21 +374,20 @@ func (r *Reconciler) reconcileDeployment(ctx context.Context, d *v1.Deployment)
desired.Spec = d.Spec
current, err = r.KubeClientSet.AppsV1().Deployments(current.Namespace).Update(desired)
if err != nil {
return nil, err
return err
}
}
return current, nil
return nil
}

// reconcileService reconciles the K8s Service 'svc'.
func (r *Reconciler) reconcileService(ctx context.Context, svc *corev1.Service) (*corev1.Service, error) {
func (r *Reconciler) reconcileService(ctx context.Context, svc *corev1.Service) (*corev1.Endpoints, error) {
current, err := r.serviceLister.Services(svc.Namespace).Get(svc.Name)
if apierrs.IsNotFound(err) {
current, err = r.KubeClientSet.CoreV1().Services(svc.Namespace).Create(svc)
if err != nil {
return nil, err
}
return current, nil
} else if err != nil {
return nil, err
}
Expand All @@ -405,11 +404,12 @@ func (r *Reconciler) reconcileService(ctx context.Context, svc *corev1.Service)
return nil, err
}
}
return current, nil

return r.endpointsLister.Endpoints(svc.Namespace).Get(svc.Name)
}

// reconcileIngressDeploymentCRD reconciles the Ingress Deployment for a CRD backed channel.
func (r *Reconciler) reconcileIngressDeployment(ctx context.Context, b *v1alpha1.Broker, c *duckv1alpha1.Channelable) (*v1.Deployment, error) {
func (r *Reconciler) reconcileIngressDeployment(ctx context.Context, b *v1alpha1.Broker, c *duckv1alpha1.Channelable) error {
expected := resources.MakeIngressDeployment(&resources.IngressArgs{
Broker: b,
Image: r.ingressImage,
Expand All @@ -420,13 +420,13 @@ func (r *Reconciler) reconcileIngressDeployment(ctx context.Context, b *v1alpha1
}

// reconcileIngressService reconciles the Ingress Service.
func (r *Reconciler) reconcileIngressService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Service, error) {
func (r *Reconciler) reconcileIngressService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Endpoints, error) {
expected := resources.MakeIngressService(b)
return r.reconcileService(ctx, expected)
}

// reconcileTriggers reconciles the Triggers that are pointed to this broker
func (r *Reconciler) reconcileTriggers(ctx context.Context, b *v1alpha1.Broker, filterSvc *corev1.Service) error {
func (r *Reconciler) reconcileTriggers(ctx context.Context, b *v1alpha1.Broker, filterSvc kmeta.Accessor) error {

// TODO: Figure out the labels stuff... If webhook does it, we can filter like this...
// Find all the Triggers that have been labeled as belonging to me
Expand Down
Loading

0 comments on commit 0e0bccf

Please sign in to comment.