From 7db74083b826d4bb06edf5e2f716e92b8e414db3 Mon Sep 17 00:00:00 2001 From: Sunjay Bhatia <5337253+sunjayBhatia@users.noreply.github.com> Date: Mon, 10 Jan 2022 17:15:21 -0500 Subject: [PATCH] internal/k8s: StatusUpdateWriter only writes to update channel when leader (#4266) * internal/k8s: StatusUpdateWriter only writes to update channel when leader Otherwise, when not leader, update buffered channel can be filled up, since StatusUpdateHandler does not run. This deadlocks the EventHandler. Shows up in e2e tests in intermittent in-cluster smoke-test failures. More prevalent with more instances so modified smoke test to test more apps. The failure occurs when the contour instance Envoy connects to is not the leader. Signed-off-by: Sunjay Bhatia --- internal/k8s/status.go | 14 +++++++- test/e2e/incluster/smoke_test.go | 57 ++++++++++++++++++-------------- 2 files changed, 46 insertions(+), 25 deletions(-) diff --git a/internal/k8s/status.go b/internal/k8s/status.go index 0b1b33d4969..ed962711472 100644 --- a/internal/k8s/status.go +++ b/internal/k8s/status.go @@ -62,6 +62,7 @@ func (m StatusMutatorFunc) Mutate(old client.Object) client.Object { type StatusUpdateHandler struct { log logrus.FieldLogger client client.Client + sendUpdates chan struct{} updateChannel chan StatusUpdate } @@ -69,6 +70,7 @@ func NewStatusUpdateHandler(log logrus.FieldLogger, client client.Client) *Statu return &StatusUpdateHandler{ log: log, client: client, + sendUpdates: make(chan struct{}), updateChannel: make(chan StatusUpdate, 100), } } @@ -109,6 +111,9 @@ func (suh *StatusUpdateHandler) Start(ctx context.Context) error { suh.log.Info("started status update handler") defer suh.log.Info("stopped status update handler") + // Enable StatusUpdaters to start sending updates to this handler. + close(suh.sendUpdates) + for { select { case <-ctx.Done(): @@ -128,6 +133,7 @@ func (suh *StatusUpdateHandler) Start(ctx context.Context) error { // Writer retrieves the interface that should be used to write to the StatusUpdateHandler. func (suh *StatusUpdateHandler) Writer() StatusUpdater { return &StatusUpdateWriter{ + enabled: suh.sendUpdates, updateChannel: suh.updateChannel, } } @@ -139,10 +145,16 @@ type StatusUpdater interface { // StatusUpdateWriter takes status updates and sends these to the StatusUpdateHandler via a channel. type StatusUpdateWriter struct { + enabled <-chan struct{} updateChannel chan<- StatusUpdate } // Send sends the given StatusUpdate off to the update channel for writing by the StatusUpdateHandler. func (suw *StatusUpdateWriter) Send(update StatusUpdate) { - suw.updateChannel <- update + // Non-blocking receive to see if we should pass along update. + select { + case <-suw.enabled: + suw.updateChannel <- update + default: + } } diff --git a/test/e2e/incluster/smoke_test.go b/test/e2e/incluster/smoke_test.go index e821c3fa01d..aa0d0eb8973 100644 --- a/test/e2e/incluster/smoke_test.go +++ b/test/e2e/incluster/smoke_test.go @@ -17,6 +17,8 @@ package incluster import ( + "fmt" + . "github.com/onsi/ginkgo/v2" contourv1 "github.com/projectcontour/contour/apis/projectcontour/v1" "github.com/projectcontour/contour/test/e2e" @@ -26,36 +28,43 @@ import ( func testSimpleSmoke(namespace string) { Specify("simple smoke test", func() { - f.Fixtures.Echo.Deploy(namespace, "echo") + // Make multiple instances to ensure many events/updates are + // processed correctly. + // This test may become flaky and should be investigated if there + // are changes that cause differences between the leader and + // non-leader contour instances. + for i := 0; i < 20; i++ { + f.Fixtures.Echo.Deploy(namespace, fmt.Sprintf("echo-%d", i)) - p := &contourv1.HTTPProxy{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: namespace, - Name: "smoke-test", - }, - Spec: contourv1.HTTPProxySpec{ - VirtualHost: &contourv1.VirtualHost{ - Fqdn: "smoke-test.projectcontour.io", + p := &contourv1.HTTPProxy{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: fmt.Sprintf("smoke-test-%d", i), }, - Routes: []contourv1.Route{ - { - Services: []contourv1.Service{ - { - Name: "echo", - Port: 80, + Spec: contourv1.HTTPProxySpec{ + VirtualHost: &contourv1.VirtualHost{ + Fqdn: fmt.Sprintf("smoke-test-%d.projectcontour.io", i), + }, + Routes: []contourv1.Route{ + { + Services: []contourv1.Service{ + { + Name: fmt.Sprintf("echo-%d", i), + Port: 80, + }, }, }, }, }, - }, - } - f.CreateHTTPProxyAndWaitFor(p, e2e.HTTPProxyValid) + } + f.CreateHTTPProxyAndWaitFor(p, e2e.HTTPProxyValid) - res, ok := f.HTTP.RequestUntil(&e2e.HTTPRequestOpts{ - Host: p.Spec.VirtualHost.Fqdn, - Condition: e2e.HasStatusCode(200), - }) - require.NotNil(f.T(), res, "request never succeeded") - require.Truef(f.T(), ok, "expected 200 response code, got %d", res.StatusCode) + res, ok := f.HTTP.RequestUntil(&e2e.HTTPRequestOpts{ + Host: p.Spec.VirtualHost.Fqdn, + Condition: e2e.HasStatusCode(200), + }) + require.NotNil(f.T(), res, "request never succeeded") + require.Truef(f.T(), ok, "expected 200 response code, got %d for echo-%d", res.StatusCode, i) + } }) }