Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Broker availability based on the Endpoints not Deployments. #2766

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/core/roles/controller-clusterroles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ rules:
- "secrets"
- "configmaps"
- "services"
- "endpoints"
- "events"
- "serviceaccounts"
verbs: &everything
Expand Down
11 changes: 11 additions & 0 deletions pkg/apis/duck/lifecycle_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package duck

import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)

// DeploymentIsAvailable determines if the provided deployment is available. Note that if it cannot
Expand All @@ -31,3 +32,13 @@ func DeploymentIsAvailable(d *appsv1.DeploymentStatus, def bool) bool {
}
return def
}

// EndpointsAreAvailable determines if the provided Endpoints are available.
func EndpointsAreAvailable(ep *corev1.Endpoints) bool {
for _, subset := range ep.Subsets {
if len(subset.Addresses) > 0 {
return true
}
}
return false
}
18 changes: 7 additions & 11 deletions pkg/apis/eventing/v1alpha1/broker_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ limitations under the License.
package v1alpha1

import (
appsv1 "k8s.io/api/apps/v1"
"knative.dev/pkg/apis"

corev1 "k8s.io/api/core/v1"
"knative.dev/eventing/pkg/apis/duck"
duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
)
Expand Down Expand Up @@ -63,13 +63,11 @@ func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interfa
brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...)
}

func (bs *BrokerStatus) PropagateIngressDeploymentAvailability(d *appsv1.Deployment) {
if duck.DeploymentIsAvailable(&d.Status, true) {
func (bs *BrokerStatus) PropagateIngressAvailability(ep *corev1.Endpoints) {
if duck.EndpointsAreAvailable(ep) {
brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngress)
} else {
// I don't know how to propagate the status well, so just give the name of the Deployment
// for now.
bs.MarkIngressFailed("DeploymentUnavailable", "The Deployment '%s' is unavailable.", d.Name)
bs.MarkIngressFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name)
}
}

Expand All @@ -91,13 +89,11 @@ func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interfac
brokerCondSet.Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...)
}

func (bs *BrokerStatus) PropagateFilterDeploymentAvailability(d *appsv1.Deployment) {
if duck.DeploymentIsAvailable(&d.Status, true) {
func (bs *BrokerStatus) PropagateFilterAvailability(ep *corev1.Endpoints) {
if duck.EndpointsAreAvailable(ep) {
brokerCondSet.Manage(bs).MarkTrue(BrokerConditionFilter)
} else {
// I don't know how to propagate the status well, so just give the name of the Deployment
// for now.
bs.MarkFilterFailed("DeploymentUnavailable", "The Deployment '%s' is unavailable.", d.Name)
bs.MarkFilterFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name)
}
}

Expand Down
17 changes: 8 additions & 9 deletions pkg/apis/eventing/v1alpha1/broker_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"

"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"

duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
Expand Down Expand Up @@ -316,13 +315,13 @@ func TestBrokerIsReady(t *testing.T) {
t.Run(testName, func(t *testing.T) {
bs := &BrokerStatus{}
if test.markIngressReady != nil {
var d *v1.Deployment
var ep *corev1.Endpoints
if *test.markIngressReady {
d = TestHelper.AvailableDeployment()
ep = TestHelper.AvailableEndpoints()
} else {
d = TestHelper.UnavailableDeployment()
ep = TestHelper.UnavailableEndpoints()
}
bs.PropagateIngressDeploymentAvailability(d)
bs.PropagateIngressAvailability(ep)
}
if test.markTriggerChannelReady != nil {
var c *duckv1alpha1.ChannelableStatus
Expand All @@ -334,13 +333,13 @@ func TestBrokerIsReady(t *testing.T) {
bs.PropagateTriggerChannelReadiness(c)
}
if test.markFilterReady != nil {
var d *v1.Deployment
var ep *corev1.Endpoints
if *test.markFilterReady {
d = TestHelper.AvailableDeployment()
ep = TestHelper.AvailableEndpoints()
} else {
d = TestHelper.UnavailableDeployment()
ep = TestHelper.UnavailableEndpoints()
}
bs.PropagateFilterDeploymentAvailability(d)
bs.PropagateFilterAvailability(ep)
}
bs.SetAddress(test.address)

Expand Down
44 changes: 21 additions & 23 deletions pkg/apis/eventing/v1alpha1/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package v1alpha1

import (
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/apis"
pkgduckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1"

Expand Down Expand Up @@ -68,9 +68,9 @@ func (testHelper) FalseSubscriptionStatus() *messagingv1alpha1.SubscriptionStatu

func (t testHelper) ReadyBrokerStatus() *BrokerStatus {
bs := &BrokerStatus{}
bs.PropagateIngressDeploymentAvailability(t.AvailableDeployment())
bs.PropagateIngressAvailability(t.AvailableEndpoints())
bs.PropagateTriggerChannelReadiness(t.ReadyChannelStatus())
bs.PropagateFilterDeploymentAvailability(t.AvailableDeployment())
bs.PropagateFilterAvailability(t.AvailableEndpoints())
bs.SetAddress(&apis.URL{Scheme: "http", Host: "foo"})
return bs
}
Expand Down Expand Up @@ -99,26 +99,24 @@ func (t testHelper) ReadyTriggerStatus() *TriggerStatus {
return ts
}

func (testHelper) UnavailableDeployment() *v1.Deployment {
d := &v1.Deployment{}
d.Name = "unavailable"
d.Status.Conditions = []v1.DeploymentCondition{
{
Type: v1.DeploymentAvailable,
Status: "False",
},
}
return d
func (t testHelper) UnavailableEndpoints() *corev1.Endpoints {
ep := &corev1.Endpoints{}
ep.Name = "unavailable"
ep.Subsets = []corev1.EndpointSubset{{
NotReadyAddresses: []corev1.EndpointAddress{{
IP: "127.0.0.1",
}},
}}
return ep
}

func (t testHelper) AvailableDeployment() *v1.Deployment {
d := t.UnavailableDeployment()
d.Name = "available"
d.Status.Conditions = []v1.DeploymentCondition{
{
Type: v1.DeploymentAvailable,
Status: "True",
},
}
return d
func (t testHelper) AvailableEndpoints() *corev1.Endpoints {
ep := &corev1.Endpoints{}
ep.Name = "available"
ep.Subsets = []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{
IP: "127.0.0.1",
}},
}}
return ep
}
49 changes: 24 additions & 25 deletions pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
appsv1listers "k8s.io/client-go/listers/apps/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"knative.dev/pkg/apis"
"knative.dev/pkg/kmeta"

duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
"knative.dev/eventing/pkg/apis/eventing"
Expand Down Expand Up @@ -64,6 +65,7 @@ type Reconciler struct {
// listers index properties about resources
brokerLister eventinglisters.BrokerLister
serviceLister corev1listers.ServiceLister
endpointsLister corev1listers.EndpointsLister
deploymentLister appsv1listers.DeploymentLister
subscriptionLister messaginglisters.SubscriptionLister
triggerLister eventinglisters.TriggerLister
Expand Down Expand Up @@ -124,7 +126,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *v1alpha1.Broker) pkgr
return err
}

func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) (*corev1.Service, pkgreconciler.Event) {
func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) (kmeta.Accessor, pkgreconciler.Event) {
logging.FromContext(ctx).Debug("Reconciling", zap.Any("Broker", b))
b.Status.InitializeConditions()
b.Status.ObservedGeneration = b.Generation
Expand Down Expand Up @@ -176,42 +178,40 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) (*co
b.Status.TriggerChannel = &chanMan.ref
b.Status.PropagateTriggerChannelReadiness(&triggerChan.Status)

filterDeployment, err := r.reconcileFilterDeployment(ctx, b)
if err != nil {
if err := r.reconcileFilterDeployment(ctx, b); err != nil {
logging.FromContext(ctx).Error("Problem reconciling filter Deployment", zap.Error(err))
b.Status.MarkFilterFailed("DeploymentFailure", "%v", err)
return nil, err
}
filterSvc, err := r.reconcileFilterService(ctx, b)
filterEndpoints, err := r.reconcileFilterService(ctx, b)
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling filter Service", zap.Error(err))
b.Status.MarkFilterFailed("ServiceFailure", "%v", err)
return nil, err
}
b.Status.PropagateFilterDeploymentAvailability(filterDeployment)
b.Status.PropagateFilterAvailability(filterEndpoints)

ingressDeployment, err := r.reconcileIngressDeployment(ctx, b, triggerChan)
if err != nil {
if err := r.reconcileIngressDeployment(ctx, b, triggerChan); err != nil {
logging.FromContext(ctx).Error("Problem reconciling ingress Deployment", zap.Error(err))
b.Status.MarkIngressFailed("DeploymentFailure", "%v", err)
return nil, err
}

svc, err := r.reconcileIngressService(ctx, b)
ingressEndpoints, err := r.reconcileIngressService(ctx, b)
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling ingress Service", zap.Error(err))
b.Status.MarkIngressFailed("ServiceFailure", "%v", err)
return nil, err
}
b.Status.PropagateIngressDeploymentAvailability(ingressDeployment)
b.Status.PropagateIngressAvailability(ingressEndpoints)
b.Status.SetAddress(&apis.URL{
Scheme: "http",
Host: names.ServiceHostName(svc.Name, svc.Namespace),
Host: names.ServiceHostName(ingressEndpoints.GetName(), ingressEndpoints.GetNamespace()),
})

// So, at this point the Broker is ready and everything should be solid
// for the triggers to act upon.
return filterSvc, nil
return filterEndpoints, nil
}

type channelTemplate struct {
Expand Down Expand Up @@ -292,7 +292,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, b *v1alpha1.Broker) pkgre
}

// reconcileFilterDeployment reconciles Broker's 'b' filter deployment.
func (r *Reconciler) reconcileFilterDeployment(ctx context.Context, b *v1alpha1.Broker) (*v1.Deployment, error) {
func (r *Reconciler) reconcileFilterDeployment(ctx context.Context, b *v1alpha1.Broker) error {
expected := resources.MakeFilterDeployment(&resources.FilterArgs{
Broker: b,
Image: r.filterImage,
Expand All @@ -302,7 +302,7 @@ func (r *Reconciler) reconcileFilterDeployment(ctx context.Context, b *v1alpha1.
}

// reconcileFilterService reconciles Broker's 'b' filter service.
func (r *Reconciler) reconcileFilterService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Service, error) {
func (r *Reconciler) reconcileFilterService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Endpoints, error) {
expected := resources.MakeFilterService(b)
return r.reconcileService(ctx, expected)
}
Expand Down Expand Up @@ -356,16 +356,15 @@ func TriggerChannelLabels(brokerName string) map[string]string {
}

// reconcileDeployment reconciles the K8s Deployment 'd'.
func (r *Reconciler) reconcileDeployment(ctx context.Context, d *v1.Deployment) (*v1.Deployment, error) {
func (r *Reconciler) reconcileDeployment(ctx context.Context, d *v1.Deployment) error {
current, err := r.deploymentLister.Deployments(d.Namespace).Get(d.Name)
if apierrs.IsNotFound(err) {
current, err = r.KubeClientSet.AppsV1().Deployments(d.Namespace).Create(d)
if err != nil {
return nil, err
return err
}
return current, nil
} else if err != nil {
return nil, err
return err
}

if !equality.Semantic.DeepDerivative(d.Spec, current.Spec) {
Expand All @@ -374,21 +373,20 @@ func (r *Reconciler) reconcileDeployment(ctx context.Context, d *v1.Deployment)
desired.Spec = d.Spec
current, err = r.KubeClientSet.AppsV1().Deployments(current.Namespace).Update(desired)
if err != nil {
return nil, err
return err
}
}
return current, nil
return nil
}

// reconcileService reconciles the K8s Service 'svc'.
func (r *Reconciler) reconcileService(ctx context.Context, svc *corev1.Service) (*corev1.Service, error) {
func (r *Reconciler) reconcileService(ctx context.Context, svc *corev1.Service) (*corev1.Endpoints, error) {
current, err := r.serviceLister.Services(svc.Namespace).Get(svc.Name)
if apierrs.IsNotFound(err) {
current, err = r.KubeClientSet.CoreV1().Services(svc.Namespace).Create(svc)
if err != nil {
return nil, err
}
return current, nil
} else if err != nil {
return nil, err
}
Expand All @@ -405,11 +403,12 @@ func (r *Reconciler) reconcileService(ctx context.Context, svc *corev1.Service)
return nil, err
}
}
return current, nil

return r.endpointsLister.Endpoints(svc.Namespace).Get(svc.Name)
}

// reconcileIngressDeploymentCRD reconciles the Ingress Deployment for a CRD backed channel.
func (r *Reconciler) reconcileIngressDeployment(ctx context.Context, b *v1alpha1.Broker, c *duckv1alpha1.Channelable) (*v1.Deployment, error) {
func (r *Reconciler) reconcileIngressDeployment(ctx context.Context, b *v1alpha1.Broker, c *duckv1alpha1.Channelable) error {
expected := resources.MakeIngressDeployment(&resources.IngressArgs{
Broker: b,
Image: r.ingressImage,
Expand All @@ -420,13 +419,13 @@ func (r *Reconciler) reconcileIngressDeployment(ctx context.Context, b *v1alpha1
}

// reconcileIngressService reconciles the Ingress Service.
func (r *Reconciler) reconcileIngressService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Service, error) {
func (r *Reconciler) reconcileIngressService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Endpoints, error) {
expected := resources.MakeIngressService(b)
return r.reconcileService(ctx, expected)
}

// reconcileTriggers reconciles the Triggers that are pointed to this broker
func (r *Reconciler) reconcileTriggers(ctx context.Context, b *v1alpha1.Broker, filterSvc *corev1.Service) error {
func (r *Reconciler) reconcileTriggers(ctx context.Context, b *v1alpha1.Broker, filterSvc kmeta.Accessor) error {

// TODO: Figure out the labels stuff... If webhook does it, we can filter like this...
// Find all the Triggers that have been labeled as belonging to me
Expand Down
Loading