From 92a80e9839654481679a1885e1e163adfc8452c3 Mon Sep 17 00:00:00 2001 From: Kirill Sibirev Date: Mon, 13 Jan 2025 13:50:20 +0100 Subject: [PATCH] Add event aggregator --- test/e2e/combined_watcher_test.go | 151 ++++++++++++++++++++++++++++++ test/e2e/watcher_test.go | 38 ++++---- 2 files changed, 173 insertions(+), 16 deletions(-) create mode 100644 test/e2e/combined_watcher_test.go diff --git a/test/e2e/combined_watcher_test.go b/test/e2e/combined_watcher_test.go new file mode 100644 index 00000000..20ccb742 --- /dev/null +++ b/test/e2e/combined_watcher_test.go @@ -0,0 +1,151 @@ +package controllers_test + +import ( + "context" + "reflect" + "testing" + "time" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" +) + +type combinedKubeWatcher struct { + watchers []watch.Interface + stopCh chan struct{} + muxResultChan chan watch.Event +} + +func newCombinedKubeWatcher(ctx context.Context, kubecli client.WithWatch, namespace string, lists []client.ObjectList) (*combinedKubeWatcher, error) { + muxResultChan := make(chan watch.Event) + stopCh := make(chan struct{}) + + var watchers []watch.Interface + for _, objList := range lists { + watcher, err := kubecli.Watch(ctx, objList, &client.ListOptions{ + Namespace: namespace, + }) + if err != nil { + return nil, err + } + go func(ch <-chan watch.Event) { + for { + select { + case msg, ok := <-ch: + if !ok { + return + } + muxResultChan <- msg + case <-stopCh: + return + } + } + }(watcher.ResultChan()) + watchers = append(watchers, watcher) + } + return &combinedKubeWatcher{ + watchers: watchers, + stopCh: stopCh, + muxResultChan: muxResultChan, + }, nil +} + +func (w *combinedKubeWatcher) Stop() { + close(w.stopCh) + for _, watcher := range w.watchers { + watcher.Stop() + } +} + +func (w *combinedKubeWatcher) ResultChan() <-chan watch.Event { + return w.muxResultChan +} + +func TestCombinedWatcher(t *testing.T) { + testCtx := context.Background() + + cfg, err := config.GetConfig() + require.NoError(t, err) + scheme := runtime.NewScheme() + require.NoError(t, corev1.AddToScheme(scheme)) + kubecli, err := client.NewWithWatch(cfg, client.Options{Scheme: scheme}) + require.NoError(t, err) + + // Create a temporary namespace + namespaceObj := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-namespace-", + }, + } + err = kubecli.Create(testCtx, namespaceObj) + require.NoError(t, err) + namespace := namespaceObj.Name + defer func() { + err := kubecli.Delete(testCtx, namespaceObj) + require.NoError(t, err) + }() + + watcher, err := newCombinedKubeWatcher(testCtx, kubecli, namespace, []client.ObjectList{ + &corev1.ConfigMapList{}, + &corev1.SecretList{}, + }) + require.NoError(t, err) + + var events []watch.Event + go func() { + for { + select { + case msg, ok := <-watcher.ResultChan(): + if !ok { + return + } + switch obj := msg.Object.(type) { + case *corev1.ConfigMap: + if obj.Name == "kube-root-ca.crt" { + continue + } + } + events = append(events, msg) + } + } + }() + + err = kubecli.Create(testCtx, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cm", + Namespace: namespace, + }, + }, &client.CreateOptions{}) + require.NoError(t, err) + err = kubecli.Create(testCtx, &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-secret", + Namespace: namespace, + }, + }, &client.CreateOptions{}) + require.NoError(t, err) + + expectedEvents := []watch.Event{ + {Type: watch.Added, Object: &corev1.ConfigMap{}}, + {Type: watch.Added, Object: &corev1.Secret{}}, + } + require.Eventually(t, func() bool { + if len(events) > len(expectedEvents) { + t.Errorf("Got more events than expected already: %s", events) + } + for i, realEvent := range events { + expectedEvent := expectedEvents[i] + realObjectType := reflect.TypeOf(realEvent.Object) + expectedObjectType := reflect.TypeOf(expectedEvent.Object) + if realEvent.Type != expectedEvent.Type || realObjectType != expectedObjectType { + t.Errorf("Event %d did not match expected event: %v != %v", i, realEvent, expectedEvent) + } + } + return len(events) == len(expectedEvents) + }, 3*time.Second, 100*time.Millisecond, "Events never matched expected list, got: %+v", events) +} diff --git a/test/e2e/watcher_test.go b/test/e2e/watcher_test.go index dfe88b92..a186aab5 100644 --- a/test/e2e/watcher_test.go +++ b/test/e2e/watcher_test.go @@ -4,26 +4,31 @@ import ( "context" . "github.com/onsi/gomega" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/watch" "sigs.k8s.io/controller-runtime/pkg/client" + + ytv1 "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1" ) type NamespaceWatcher struct { kubeWatcher watch.Interface stopCh chan struct{} - events []WatcherEvent + events []testEvent } -type WatcherEvent struct { +type testEvent struct { Kind string Name string } func NewNamespaceWatcher(ctx context.Context, namespace string) *NamespaceWatcher { - watcher, err := k8sClient.Watch(ctx, &corev1.EventList{}, &client.ListOptions{ - Namespace: namespace, + watcher, err := newCombinedKubeWatcher(ctx, k8sClient, namespace, []client.ObjectList{ + &corev1.EventList{}, + &batchv1.JobList{}, + &ytv1.YtsaurusList{}, }) Expect(err).ToNot(HaveOccurred()) return &NamespaceWatcher{ @@ -41,7 +46,7 @@ func (w *NamespaceWatcher) Stop() { w.kubeWatcher.Stop() } -func (w *NamespaceWatcher) Events() []WatcherEvent { +func (w *NamespaceWatcher) Events() []testEvent { return w.events } @@ -51,17 +56,17 @@ func (w *NamespaceWatcher) loop() { case <-w.stopCh: return case ev := <-w.kubeWatcher.ResultChan(): - w.handleEvent(ev) + w.handleWatchEvent(ev) } } } -func (w *NamespaceWatcher) handleEvent(ev watch.Event) { - w.maybeLogEvent(ev) - w.maybeStoreEvent(ev) +func (w *NamespaceWatcher) handleWatchEvent(ev watch.Event) { + w.maybeLogWatchEvent(ev) + w.maybeStoreWatchEvent(ev) } -func (w *NamespaceWatcher) maybeLogEvent(ev watch.Event) { +func (w *NamespaceWatcher) maybeLogWatchEvent(ev watch.Event) { logEvent := func(event *corev1.Event) { log.Info("Event", "type", event.Type, @@ -80,14 +85,15 @@ func (w *NamespaceWatcher) maybeLogEvent(ev watch.Event) { } } -func (w *NamespaceWatcher) maybeStoreEvent(ev watch.Event) { +func (w *NamespaceWatcher) maybeStoreWatchEvent(ev watch.Event) { switch ev.Type { case watch.Added, watch.Modified: - if event, ok := ev.Object.(*corev1.Event); ok { - w.events = append(w.events, WatcherEvent{ - Kind: event.InvolvedObject.Kind, - Name: event.InvolvedObject.Name, - }) + if obj, ok := ev.Object.(*batchv1.Job); ok { + event := testEvent{ + Kind: obj.Kind, + Name: obj.Name, + } + w.events = append(w.events, event) } } }