Skip to content

Commit

Permalink
internal/k8s: StatusUpdateWriter only writes to update channel when l…
Browse files Browse the repository at this point in the history
…eader (#4266)

* internal/k8s: StatusUpdateWriter only writes to update channel when leader

Otherwise, when not leader, update buffered channel can be filled up,
since StatusUpdateHandler does not run. This deadlocks the EventHandler.

Shows up in e2e tests in intermittent in-cluster smoke-test failures.
More prevalent with more instances so modified smoke test to test more
apps. The failure occurs when the contour instance Envoy connects to is
not the leader.

Signed-off-by: Sunjay Bhatia <sunjayb@vmware.com>
  • Loading branch information
sunjayBhatia authored Jan 10, 2022
1 parent 4f025fe commit 7db7408
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 25 deletions.
14 changes: 13 additions & 1 deletion internal/k8s/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ func (m StatusMutatorFunc) Mutate(old client.Object) client.Object {
type StatusUpdateHandler struct {
log logrus.FieldLogger
client client.Client
sendUpdates chan struct{}
updateChannel chan StatusUpdate
}

func NewStatusUpdateHandler(log logrus.FieldLogger, client client.Client) *StatusUpdateHandler {
return &StatusUpdateHandler{
log: log,
client: client,
sendUpdates: make(chan struct{}),
updateChannel: make(chan StatusUpdate, 100),
}
}
Expand Down Expand Up @@ -109,6 +111,9 @@ func (suh *StatusUpdateHandler) Start(ctx context.Context) error {
suh.log.Info("started status update handler")
defer suh.log.Info("stopped status update handler")

// Enable StatusUpdaters to start sending updates to this handler.
close(suh.sendUpdates)

for {
select {
case <-ctx.Done():
Expand All @@ -128,6 +133,7 @@ func (suh *StatusUpdateHandler) Start(ctx context.Context) error {
// Writer retrieves the interface that should be used to write to the StatusUpdateHandler.
func (suh *StatusUpdateHandler) Writer() StatusUpdater {
return &StatusUpdateWriter{
enabled: suh.sendUpdates,
updateChannel: suh.updateChannel,
}
}
Expand All @@ -139,10 +145,16 @@ type StatusUpdater interface {

// StatusUpdateWriter takes status updates and sends these to the StatusUpdateHandler via a channel.
type StatusUpdateWriter struct {
enabled <-chan struct{}
updateChannel chan<- StatusUpdate
}

// Send sends the given StatusUpdate off to the update channel for writing by the StatusUpdateHandler.
func (suw *StatusUpdateWriter) Send(update StatusUpdate) {
suw.updateChannel <- update
// Non-blocking receive to see if we should pass along update.
select {
case <-suw.enabled:
suw.updateChannel <- update
default:
}
}
57 changes: 33 additions & 24 deletions test/e2e/incluster/smoke_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package incluster

import (
"fmt"

. "github.com/onsi/ginkgo/v2"
contourv1 "github.com/projectcontour/contour/apis/projectcontour/v1"
"github.com/projectcontour/contour/test/e2e"
Expand All @@ -26,36 +28,43 @@ import (

func testSimpleSmoke(namespace string) {
Specify("simple smoke test", func() {
f.Fixtures.Echo.Deploy(namespace, "echo")
// Make multiple instances to ensure many events/updates are
// processed correctly.
// This test may become flaky and should be investigated if there
// are changes that cause differences between the leader and
// non-leader contour instances.
for i := 0; i < 20; i++ {
f.Fixtures.Echo.Deploy(namespace, fmt.Sprintf("echo-%d", i))

p := &contourv1.HTTPProxy{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: "smoke-test",
},
Spec: contourv1.HTTPProxySpec{
VirtualHost: &contourv1.VirtualHost{
Fqdn: "smoke-test.projectcontour.io",
p := &contourv1.HTTPProxy{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: fmt.Sprintf("smoke-test-%d", i),
},
Routes: []contourv1.Route{
{
Services: []contourv1.Service{
{
Name: "echo",
Port: 80,
Spec: contourv1.HTTPProxySpec{
VirtualHost: &contourv1.VirtualHost{
Fqdn: fmt.Sprintf("smoke-test-%d.projectcontour.io", i),
},
Routes: []contourv1.Route{
{
Services: []contourv1.Service{
{
Name: fmt.Sprintf("echo-%d", i),
Port: 80,
},
},
},
},
},
},
}
f.CreateHTTPProxyAndWaitFor(p, e2e.HTTPProxyValid)
}
f.CreateHTTPProxyAndWaitFor(p, e2e.HTTPProxyValid)

res, ok := f.HTTP.RequestUntil(&e2e.HTTPRequestOpts{
Host: p.Spec.VirtualHost.Fqdn,
Condition: e2e.HasStatusCode(200),
})
require.NotNil(f.T(), res, "request never succeeded")
require.Truef(f.T(), ok, "expected 200 response code, got %d", res.StatusCode)
res, ok := f.HTTP.RequestUntil(&e2e.HTTPRequestOpts{
Host: p.Spec.VirtualHost.Fqdn,
Condition: e2e.HasStatusCode(200),
})
require.NotNil(f.T(), res, "request never succeeded")
require.Truef(f.T(), ok, "expected 200 response code, got %d for echo-%d", res.StatusCode, i)
}
})
}

0 comments on commit 7db7408

Please sign in to comment.