Skip to content

Commit

Permalink
Add batch ID and inject loggers
Browse files Browse the repository at this point in the history
  • Loading branch information
kate-osborn committed Oct 12, 2023
1 parent 0674758 commit 6288b83
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 70 deletions.
23 changes: 13 additions & 10 deletions internal/framework/events/eventsfakes/fake_event_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion internal/framework/events/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package events

import (
"context"

"github.com/go-logr/logr"
)

//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . EventHandler
Expand All @@ -10,5 +12,5 @@ import (
type EventHandler interface {
// HandleEventBatch handles a batch of events.
// EventBatch can include duplicated events.
HandleEventBatch(ctx context.Context, batch EventBatch)
HandleEventBatch(ctx context.Context, logger logr.Logger, batch EventBatch)
}
12 changes: 9 additions & 3 deletions internal/framework/events/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type EventLoop struct {
// The batches are swapped before starting the handler goroutine.
currentBatch EventBatch
nextBatch EventBatch

// the ID of the current batch
currentBatchID int
}

// NewEventLoop creates a new EventLoop.
Expand Down Expand Up @@ -63,11 +66,14 @@ func (el *EventLoop) Start(ctx context.Context) error {

handleBatch := func() {
go func(batch EventBatch) {
el.logger.Info("Handling events from the batch", "total", len(batch))
el.currentBatchID++
batchLogger := el.logger.WithName("eventHandler").WithValues("batchID", el.currentBatchID)

batchLogger.Info("Handling events from the batch", "total", len(batch))

el.handler.HandleEventBatch(ctx, batch)
el.handler.HandleEventBatch(ctx, batchLogger, batch)

el.logger.Info("Finished handling the batch")
batchLogger.Info("Finished handling the batch")
handlingDone <- struct{}{}
}(el.currentBatch)
}
Expand Down
11 changes: 6 additions & 5 deletions internal/framework/events/loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"

"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand Down Expand Up @@ -47,7 +48,7 @@ var _ = Describe("EventLoop", func() {

// Ensure the first batch is handled
Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(1))
_, batch = fakeHandler.HandleEventBatchArgsForCall(0)
_, _, batch = fakeHandler.HandleEventBatchArgsForCall(0)

var expectedBatch events.EventBatch = []interface{}{"event0"}
Expect(batch).Should(Equal(expectedBatch))
Expand All @@ -70,7 +71,7 @@ var _ = Describe("EventLoop", func() {
eventCh <- e

Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(2))
_, batch := fakeHandler.HandleEventBatchArgsForCall(1)
_, _, batch := fakeHandler.HandleEventBatchArgsForCall(1)

var expectedBatch events.EventBatch = []interface{}{e}
Expect(batch).Should(Equal(expectedBatch))
Expand All @@ -82,7 +83,7 @@ var _ = Describe("EventLoop", func() {

// The func below will pause the handler goroutine while it is processing the batch with e1 until
// sentSecondAndThirdEvents is closed. This way we can add e2 and e3 to the current batch in the meantime.
fakeHandler.HandleEventBatchCalls(func(ctx context.Context, batch events.EventBatch) {
fakeHandler.HandleEventBatchCalls(func(ctx context.Context, logger logr.Logger, batch events.EventBatch) {
close(firstHandleEventBatchCallInProgress)
<-sentSecondAndThirdEvents
})
Expand All @@ -106,14 +107,14 @@ var _ = Describe("EventLoop", func() {
close(sentSecondAndThirdEvents)

Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(3))
_, batch := fakeHandler.HandleEventBatchArgsForCall(1)
_, _, batch := fakeHandler.HandleEventBatchArgsForCall(1)

var expectedBatch events.EventBatch = []interface{}{e1}

// the first HandleEventBatch() call must have handled a batch with e1
Expect(batch).Should(Equal(expectedBatch))

_, batch = fakeHandler.HandleEventBatchArgsForCall(2)
_, _, batch = fakeHandler.HandleEventBatchArgsForCall(2)

expectedBatch = []interface{}{e2, e3}
// the second HandleEventBatch() call must have handled a batch with e2 and e3
Expand Down
13 changes: 5 additions & 8 deletions internal/mode/provisioner/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type eventHandler struct {

statusUpdater status.Updater
k8sClient client.Client
logger logr.Logger

staticModeDeploymentYAML []byte

Expand All @@ -38,7 +37,6 @@ func newEventHandler(
gcName string,
statusUpdater status.Updater,
k8sClient client.Client,
logger logr.Logger,
staticModeDeploymentYAML []byte,
) *eventHandler {
return &eventHandler{
Expand All @@ -47,7 +45,6 @@ func newEventHandler(
statusUpdater: statusUpdater,
gcName: gcName,
k8sClient: k8sClient,
logger: logger,
staticModeDeploymentYAML: staticModeDeploymentYAML,
gatewayNextID: 1,
}
Expand Down Expand Up @@ -80,7 +77,7 @@ func (h *eventHandler) setGatewayClassStatuses(ctx context.Context) {
h.statusUpdater.Update(ctx, statuses)
}

func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context) {
func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context, logger logr.Logger) {
var gwsWithoutDeps, removedGwsWithDeps []types.NamespacedName

for nsname, gw := range h.store.gateways {
Expand Down Expand Up @@ -116,7 +113,7 @@ func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context) {

h.provisions[nsname] = deployment

h.logger.Info(
logger.Info(
"Created deployment",
"deployment", client.ObjectKeyFromObject(deployment),
"gateway", nsname,
Expand All @@ -134,18 +131,18 @@ func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context) {

delete(h.provisions, nsname)

h.logger.Info(
logger.Info(
"Deleted deployment",
"deployment", client.ObjectKeyFromObject(deployment),
"gateway", nsname,
)
}
}

func (h *eventHandler) HandleEventBatch(ctx context.Context, batch events.EventBatch) {
func (h *eventHandler) HandleEventBatch(ctx context.Context, logger logr.Logger, batch events.EventBatch) {
h.store.update(batch)
h.setGatewayClassStatuses(ctx)
h.ensureDeploymentsMatchGateways(ctx)
h.ensureDeploymentsMatchGateways(ctx, logger)
}

func (h *eventHandler) generateDeploymentID() string {
Expand Down
23 changes: 10 additions & 13 deletions internal/mode/provisioner/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ var _ = Describe("handler", func() {
Resource: gc,
},
}
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

// Ensure GatewayClass is accepted

Expand Down Expand Up @@ -126,7 +126,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

depNsName := types.NamespacedName{
Namespace: "nginx-gateway",
Expand Down Expand Up @@ -156,7 +156,7 @@ var _ = Describe("handler", func() {
}

handle := func() {
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
}

Expect(handle).Should(Panic())
Expand All @@ -179,7 +179,6 @@ var _ = Describe("handler", func() {
gcName,
statusUpdater,
k8sclient,
zap.New(),
embeddedfiles.StaticModeDeploymentYAML,
)
})
Expand Down Expand Up @@ -217,7 +216,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
deps := &v1.DeploymentList{}

err := k8sclient.List(context.Background(), deps)
Expand All @@ -237,7 +236,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

deps := &v1.DeploymentList{}

Expand Down Expand Up @@ -266,7 +265,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

deps := &v1.DeploymentList{}
err := k8sclient.List(context.Background(), deps)
Expand Down Expand Up @@ -295,7 +294,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

unknownGC := &v1beta1.GatewayClass{}
err = k8sclient.Get(context.Background(), client.ObjectKeyFromObject(gc), unknownGC)
Expand Down Expand Up @@ -330,7 +329,6 @@ var _ = Describe("handler", func() {
gcName,
statusUpdater,
k8sclient,
zap.New(),
embeddedfiles.StaticModeDeploymentYAML,
)
})
Expand All @@ -340,7 +338,7 @@ var _ = Describe("handler", func() {
batch := []interface{}{e}

handle := func() {
handler.HandleEventBatch(context.TODO(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
}

Expect(handle).Should(Panic())
Expand Down Expand Up @@ -408,7 +406,7 @@ var _ = Describe("handler", func() {
}

handle := func() {
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
}

Expect(handle).Should(Panic())
Expand All @@ -429,7 +427,7 @@ var _ = Describe("handler", func() {
}

handle := func() {
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
}

Expect(handle).Should(Panic())
Expand All @@ -442,7 +440,6 @@ var _ = Describe("handler", func() {
gcName,
statusUpdater,
k8sclient,
zap.New(),
[]byte("broken YAML"),
)

Expand Down
1 change: 0 additions & 1 deletion internal/mode/provisioner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ func StartManager(cfg Config) error {
cfg.GatewayClassName,
statusUpdater,
mgr.GetClient(),
cfg.Logger.WithName("eventHandler"),
embeddedfiles.StaticModeDeploymentYAML,
)

Expand Down
Loading

0 comments on commit 6288b83

Please sign in to comment.