From faa7c88ec41e704f25736016b25bc3644213271d Mon Sep 17 00:00:00 2001 From: Kirill Sibirev Date: Mon, 13 Jan 2025 11:20:00 +0100 Subject: [PATCH] Refactor namespace events logger --- test/e2e/suite_test.go | 54 ---------------- test/e2e/watcher_test.go | 93 ++++++++++++++++++++++++++++ test/e2e/ytsaurus_controller_test.go | 1 - 3 files changed, 93 insertions(+), 55 deletions(-) create mode 100644 test/e2e/watcher_test.go diff --git a/test/e2e/suite_test.go b/test/e2e/suite_test.go index 6c39df32..a03c0ccd 100644 --- a/test/e2e/suite_test.go +++ b/test/e2e/suite_test.go @@ -33,8 +33,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" clientgoretry "k8s.io/client-go/util/retry" @@ -44,8 +42,6 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -129,56 +125,6 @@ var _ = BeforeEach(func() { }) }) -type NamespaceWatcher struct { - kubeWatcher watch.Interface - stopCh chan struct{} -} - -func NewNamespaceWatcher(ctx context.Context, namespace string) *NamespaceWatcher { - watcher, err := k8sClient.Watch(ctx, &corev1.EventList{}, &client.ListOptions{ - Namespace: namespace, - }) - Expect(err).ToNot(HaveOccurred()) - return &NamespaceWatcher{ - kubeWatcher: watcher, - } -} - -func (w *NamespaceWatcher) Start() { - go w.loop() -} - -func (w *NamespaceWatcher) loop() { - logEvent := func(event *corev1.Event) { - log.Info("Event", - "type", event.Type, - "kind", event.InvolvedObject.Kind, - "name", event.InvolvedObject.Name, - "reason", event.Reason, - "message", event.Message, - ) - } - - for { - select { - case <-w.stopCh: - return - case ev := <-w.kubeWatcher.ResultChan(): - switch ev.Type { - case watch.Added, watch.Modified: - if event, ok := ev.Object.(*corev1.Event); ok { - logEvent(event) - } - } - } - } -} - -func (w *NamespaceWatcher) Stop() { - close(w.stopCh) - w.kubeWatcher.Stop() -} - func NewYtsaurusStatusTracker() func(*ytv1.Ytsaurus) bool { prevStatus := ytv1.YtsaurusStatus{} conditions := map[string]metav1.Condition{} diff --git a/test/e2e/watcher_test.go b/test/e2e/watcher_test.go new file mode 100644 index 00000000..dfe88b92 --- /dev/null +++ b/test/e2e/watcher_test.go @@ -0,0 +1,93 @@ +package controllers_test + +import ( + "context" + + . "github.com/onsi/gomega" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/watch" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type NamespaceWatcher struct { + kubeWatcher watch.Interface + stopCh chan struct{} + events []WatcherEvent +} + +type WatcherEvent struct { + Kind string + Name string +} + +func NewNamespaceWatcher(ctx context.Context, namespace string) *NamespaceWatcher { + watcher, err := k8sClient.Watch(ctx, &corev1.EventList{}, &client.ListOptions{ + Namespace: namespace, + }) + Expect(err).ToNot(HaveOccurred()) + return &NamespaceWatcher{ + kubeWatcher: watcher, + stopCh: make(chan struct{}), + } +} + +func (w *NamespaceWatcher) Start() { + go w.loop() +} + +func (w *NamespaceWatcher) Stop() { + close(w.stopCh) + w.kubeWatcher.Stop() +} + +func (w *NamespaceWatcher) Events() []WatcherEvent { + return w.events +} + +func (w *NamespaceWatcher) loop() { + for { + select { + case <-w.stopCh: + return + case ev := <-w.kubeWatcher.ResultChan(): + w.handleEvent(ev) + } + } +} + +func (w *NamespaceWatcher) handleEvent(ev watch.Event) { + w.maybeLogEvent(ev) + w.maybeStoreEvent(ev) +} + +func (w *NamespaceWatcher) maybeLogEvent(ev watch.Event) { + logEvent := func(event *corev1.Event) { + log.Info("Event", + "type", event.Type, + "kind", event.InvolvedObject.Kind, + "name", event.InvolvedObject.Name, + "reason", event.Reason, + "message", event.Message, + ) + } + + switch ev.Type { + case watch.Added, watch.Modified: + if event, ok := ev.Object.(*corev1.Event); ok { + logEvent(event) + } + } +} + +func (w *NamespaceWatcher) maybeStoreEvent(ev watch.Event) { + switch ev.Type { + case watch.Added, watch.Modified: + if event, ok := ev.Object.(*corev1.Event); ok { + w.events = append(w.events, WatcherEvent{ + Kind: event.InvolvedObject.Kind, + Name: event.InvolvedObject.Name, + }) + } + } +} diff --git a/test/e2e/ytsaurus_controller_test.go b/test/e2e/ytsaurus_controller_test.go index 94e31072..0cc5d71d 100644 --- a/test/e2e/ytsaurus_controller_test.go +++ b/test/e2e/ytsaurus_controller_test.go @@ -219,7 +219,6 @@ var _ = Describe("Basic e2e test for Ytsaurus controller", Label("e2e"), func() }) JustBeforeEach(func(ctx context.Context) { - By("Creating resource objects") for _, object := range objects { By(fmt.Sprintf("Creating %v %v", GetObjectGVK(object), object.GetName()))