Skip to content

Commit

Permalink
add liveness and readiness check for broker(knative#1656)
Browse files Browse the repository at this point in the history
  • Loading branch information
grac3gao committed Aug 20, 2019
1 parent 7208fb1 commit a721aff
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 42 deletions.
2 changes: 1 addition & 1 deletion cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func main() {
logger.Fatal("Unable to create CE transport", zap.Error(err))
}

//liveness check
// Liveness check.
httpTransport.Handler = http.NewServeMux()
httpTransport.Handler.HandleFunc("/healthz", func(writer http.ResponseWriter, _ *http.Request) {
writer.WriteHeader(http.StatusOK)
Expand Down
5 changes: 3 additions & 2 deletions pkg/broker/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ func New(logger *zap.Logger, client client.Client) (*Receiver, error) {
if err != nil {
return nil, err
}
//liveness check
// Liveness check.
httpTransport.Handler = http.NewServeMux()
httpTransport.Handler.HandleFunc("/healthz", func(writer http.ResponseWriter, _ *http.Request) {
writer.WriteHeader(http.StatusOK)
})
//readiness check
// Readiness check.
isReady := &atomic.Value{}
isReady.Store(false)
httpTransport.Handler.HandleFunc("/readyz", func(writer http.ResponseWriter, _ *http.Request) {
Expand All @@ -89,6 +89,7 @@ func New(logger *zap.Logger, client client.Client) (*Receiver, error) {
if err != nil {
return nil, err
}
// TODO mark isReady false when the client is too far out of sync.
isReady.Store(true)
return r, nil
}
Expand Down
72 changes: 33 additions & 39 deletions pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(filterContainerName), envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
Expand All @@ -238,7 +238,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, "some-other-image", livenessProbe(), readinessProbe(filterContainerName), envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, "some-other-image", livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
},
WithReactors: []clientgotesting.ReactionFunc{
InduceFailure("update", "deployments"),
Expand All @@ -256,7 +256,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(filterContainerName), envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
}},
WantEvents: []string{
Eventf(corev1.EventTypeWarning, brokerReconcileError, "Broker reconcile error: %v", "inducing failure for update deployments"),
Expand All @@ -275,7 +275,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(filterContainerName), envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
},
WithReactors: []clientgotesting.ReactionFunc{
InduceFailure("create", "services"),
Expand Down Expand Up @@ -312,7 +312,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(filterContainerName), envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand Down Expand Up @@ -352,7 +352,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(filterContainerName), envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand All @@ -366,7 +366,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), readinessProbe(ingressContainerName), envVars(ingressContainerName), containerPorts(8080)),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(8080)),
),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Expand Down Expand Up @@ -395,7 +395,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(filterContainerName), envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand All @@ -404,7 +404,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), readinessProbe(ingressContainerName), envVars(ingressContainerName), containerPorts(9090))),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(9090))),
},
WithReactors: []clientgotesting.ReactionFunc{
InduceFailure("update", "deployments"),
Expand All @@ -414,7 +414,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), readinessProbe(ingressContainerName), envVars(ingressContainerName), containerPorts(8080))),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(8080))),
}},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
Expand Down Expand Up @@ -442,7 +442,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(filterContainerName), envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand All @@ -451,7 +451,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), readinessProbe(ingressContainerName), envVars(ingressContainerName), containerPorts(8080))),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(8080))),
},
WithReactors: []clientgotesting.ReactionFunc{
InduceFailure("create", "services"),
Expand Down Expand Up @@ -489,7 +489,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(filterContainerName), envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand All @@ -498,7 +498,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), readinessProbe(ingressContainerName), envVars(ingressContainerName), containerPorts(8080))),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(8080))),
NewService(ingressServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.IngressLabels(brokerName)),
Expand Down Expand Up @@ -539,7 +539,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(filterContainerName), envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand All @@ -548,7 +548,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), readinessProbe(ingressContainerName), envVars(ingressContainerName), containerPorts(8080))),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(8080))),
NewService(ingressServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.IngressLabels(brokerName)),
Expand Down Expand Up @@ -589,7 +589,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(filterContainerName), envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand All @@ -598,7 +598,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), readinessProbe(ingressContainerName), envVars(ingressContainerName), containerPorts(8080))),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(8080))),
NewService(ingressServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.IngressLabels(brokerName)),
Expand Down Expand Up @@ -647,7 +647,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(filterContainerName), envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand All @@ -656,7 +656,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), readinessProbe(ingressContainerName), envVars(ingressContainerName), containerPorts(8080))),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(8080))),
NewService(ingressServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.IngressLabels(brokerName)),
Expand Down Expand Up @@ -706,7 +706,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(filterContainerName), envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand All @@ -715,7 +715,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), readinessProbe(ingressContainerName), envVars(ingressContainerName), containerPorts(8080))),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(8080))),
NewService(ingressServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.IngressLabels(brokerName)),
Expand Down Expand Up @@ -772,7 +772,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.FilterLabels(brokerName)),
WithDeploymentServiceAccount(filterSA),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(filterContainerName), envVars(filterContainerName), containerPorts(8080))),
WithDeploymentContainer(filterContainerName, filterImage, livenessProbe(), readinessProbe(), envVars(filterContainerName), containerPorts(8080))),
NewService(filterServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.FilterLabels(brokerName)),
Expand All @@ -781,7 +781,7 @@ func TestReconcileCRD(t *testing.T) {
WithDeploymentOwnerReferences(ownerReferences()),
WithDeploymentLabels(resources.IngressLabels(brokerName)),
WithDeploymentServiceAccount(ingressSA),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), readinessProbe(ingressContainerName), envVars(ingressContainerName), containerPorts(8080))),
WithDeploymentContainer(ingressContainerName, ingressImage, livenessProbe(), nil, envVars(ingressContainerName), containerPorts(8080))),
NewService(ingressServiceName, testNS,
WithServiceOwnerReferences(ownerReferences()),
WithServiceLabels(resources.IngressLabels(brokerName)),
Expand Down Expand Up @@ -865,23 +865,17 @@ func livenessProbe() *corev1.Probe {
}
}

func readinessProbe(containerName string) *corev1.Probe{
switch containerName {
case filterContainerName:
return &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/readyz",
Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
},
func readinessProbe() *corev1.Probe {
return &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/readyz",
Port: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
},
InitialDelaySeconds: 5,
PeriodSeconds: 2,
}
case ingressContainerName:
return nil
},
InitialDelaySeconds: 5,
PeriodSeconds: 2,
}
return &corev1.Probe{}
}

func envVars(containerName string) []corev1.EnvVar {
Expand Down

0 comments on commit a721aff

Please sign in to comment.