Skip to content

Commit

Permalink
Issue 1656: Liveness and Readiness Checks for the Broker Ingress and …
Browse files Browse the repository at this point in the history
…Filter (#1699)

* add liveness and readiness check for broker

* add liveness and readiness check for broker(#1656)

* add liveness and readiness check for broker(#1656)
  • Loading branch information
grac3gao-zz authored and knative-prow-robot committed Aug 20, 2019
1 parent a459865 commit cc5ccb7
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 26 deletions.
7 changes: 7 additions & 0 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ func main() {
if err != nil {
logger.Fatal("Unable to create CE transport", zap.Error(err))
}

// Liveness check.
httpTransport.Handler = http.NewServeMux()
httpTransport.Handler.HandleFunc("/healthz", func(writer http.ResponseWriter, _ *http.Request) {
writer.WriteHeader(http.StatusOK)
})

ceClient, err := cloudevents.NewClient(httpTransport, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
logger.Fatal("Unable to create CE client", zap.Error(err))
Expand Down
19 changes: 18 additions & 1 deletion pkg/broker/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"net/http"
"net/url"
"sync/atomic"
"time"

"knative.dev/eventing/pkg/logging"
Expand Down Expand Up @@ -59,6 +60,21 @@ func New(logger *zap.Logger, client client.Client) (*Receiver, error) {
if err != nil {
return nil, err
}
// Liveness check.
httpTransport.Handler = http.NewServeMux()
httpTransport.Handler.HandleFunc("/healthz", func(writer http.ResponseWriter, _ *http.Request) {
writer.WriteHeader(http.StatusOK)
})
// Readiness check.
isReady := &atomic.Value{}
isReady.Store(false)
httpTransport.Handler.HandleFunc("/readyz", func(writer http.ResponseWriter, _ *http.Request) {
if isReady == nil || !isReady.Load().(bool) {
http.Error(writer, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable)
}
writer.WriteHeader(http.StatusOK)
})

ceClient, err := cloudevents.NewClient(httpTransport, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
return nil, err
Expand All @@ -73,7 +89,8 @@ func New(logger *zap.Logger, client client.Client) (*Receiver, error) {
if err != nil {
return nil, err
}

// TODO mark isReady false when the client is too far out of sync.
isReady.Store(true)
return r, nil
}

Expand Down
74 changes: 50 additions & 24 deletions pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
Expand All @@ -232,7 +232,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, "some-other-image", envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, "some-other-image", livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
},
WithReactors: []clientgotesting.ReactionFunc{
InduceFailure("update", "deployments"),
Expand All @@ -250,7 +250,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
}},
WantEvents: []string{
Eventf(corev1.EventTypeWarning, brokerReconcileError, "Broker reconcile error: %v", "inducing failure for update deployments"),
Expand All @@ -269,7 +269,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
},
WithReactors: []clientgotesting.ReactionFunc{
InduceFailure("create", "services"),
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand Down Expand Up @@ -346,7 +346,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand All @@ -360,7 +360,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, envVars(ingressContainerName), containerPorts(8080)),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(8080)),
),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Expand Down Expand Up @@ -389,7 +389,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand All @@ -398,7 +398,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, envVars(ingressContainerName), containerPorts(9090))),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(9090))),
},
WithReactors: []clientgotesting.ReactionFunc{
InduceFailure("update", "deployments"),
Expand All @@ -408,7 +408,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, envVars(ingressContainerName), containerPorts(8080))),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(8080))),
}},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
Expand Down Expand Up @@ -436,7 +436,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand All @@ -445,7 +445,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, envVars(ingressContainerName), containerPorts(8080))),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(8080))),
},
WithReactors: []clientgotesting.ReactionFunc{
InduceFailure("create", "services"),
Expand Down Expand Up @@ -483,7 +483,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand All @@ -492,7 +492,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, envVars(ingressContainerName), containerPorts(8080))),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(8080))),
NewService(ingressServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.IngressLabels(brokerName)),
Expand Down Expand Up @@ -533,7 +533,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand All @@ -542,7 +542,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, envVars(ingressContainerName), containerPorts(8080))),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(8080))),
NewService(ingressServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.IngressLabels(brokerName)),
Expand Down Expand Up @@ -583,7 +583,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand All @@ -592,7 +592,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, envVars(ingressContainerName), containerPorts(8080))),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(8080))),
NewService(ingressServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.IngressLabels(brokerName)),
Expand Down Expand Up @@ -641,7 +641,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand All @@ -650,7 +650,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, envVars(ingressContainerName), containerPorts(8080))),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(8080))),
NewService(ingressServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.IngressLabels(brokerName)),
Expand Down Expand Up @@ -700,7 +700,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand All @@ -709,7 +709,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, envVars(ingressContainerName), containerPorts(8080))),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(8080))),
NewService(ingressServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.IngressLabels(brokerName)),
Expand Down Expand Up @@ -766,7 +766,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand All @@ -775,7 +775,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, envVars(ingressContainerName), containerPorts(8080))),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(8080))),
NewService(ingressServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.IngressLabels(brokerName)),
Expand Down Expand Up @@ -838,6 +838,32 @@ func channelCRD() metav1.TypeMeta {
}
}

func livenessProbe() *corev1.Probe {
return &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/healthz",
Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 2,
}
}

func readinessProbe() *corev1.Probe {
return &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/readyz",
Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 2,
}
}

func envVars(containerName string) []corev1.EnvVar {
switch containerName {
case filterContainerName:
Expand Down
20 changes: 20 additions & 0 deletions pkg/reconciler/broker/resources/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,26 @@ func MakeFilterDeployment(args *FilterArgs) *appsv1.Deployment {
{
Name: "filter",
Image: args.Image,
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/healthz",
Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 2,
},
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/readyz",
Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 2,
},
Env: []corev1.EnvVar{
{
Name: system.NamespaceEnvKey,
Expand Down
10 changes: 10 additions & 0 deletions pkg/reconciler/broker/resources/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ func MakeIngress(args *IngressArgs) *appsv1.Deployment {
{
Image: args.Image,
Name: "ingress",
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/healthz",
Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 2,
},
Env: []corev1.EnvVar{
{
Name: system.NamespaceEnvKey,
Expand Down
4 changes: 3 additions & 1 deletion pkg/reconciler/testing/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ func WithDeploymentServiceAccount(serviceAccountName string) DeploymentOption {
}
}

func WithDeploymentContainer(name, image string, envVars []corev1.EnvVar, containerPorts []corev1.ContainerPort) DeploymentOption {
func WithDeploymentContainer(name, image string, liveness *corev1.Probe, readiness *corev1.Probe, envVars []corev1.EnvVar, containerPorts []corev1.ContainerPort) DeploymentOption {
return func(d *appsv1.Deployment) {
d.Spec.Template.Spec.Containers[0].Name = name
d.Spec.Template.Spec.Containers[0].Image = image
d.Spec.Template.Spec.Containers[0].LivenessProbe = liveness
d.Spec.Template.Spec.Containers[0].ReadinessProbe = readiness
d.Spec.Template.Spec.Containers[0].Env = envVars
d.Spec.Template.Spec.Containers[0].Ports = containerPorts
}
Expand Down

0 comments on commit cc5ccb7

Please sign in to comment.