Skip to content

Commit

Permalink
Add event aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
l0kix2 committed Jan 13, 2025
1 parent faa7c88 commit 92a80e9
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 16 deletions.
151 changes: 151 additions & 0 deletions test/e2e/combined_watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package controllers_test

import (
"context"
"reflect"
"testing"
"time"

"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)

type combinedKubeWatcher struct {
watchers []watch.Interface
stopCh chan struct{}
muxResultChan chan watch.Event
}

func newCombinedKubeWatcher(ctx context.Context, kubecli client.WithWatch, namespace string, lists []client.ObjectList) (*combinedKubeWatcher, error) {
muxResultChan := make(chan watch.Event)
stopCh := make(chan struct{})

var watchers []watch.Interface
for _, objList := range lists {
watcher, err := kubecli.Watch(ctx, objList, &client.ListOptions{
Namespace: namespace,
})
if err != nil {
return nil, err
}
go func(ch <-chan watch.Event) {
for {
select {
case msg, ok := <-ch:
if !ok {
return
}
muxResultChan <- msg
case <-stopCh:
return
}
}
}(watcher.ResultChan())
watchers = append(watchers, watcher)
}
return &combinedKubeWatcher{
watchers: watchers,
stopCh: stopCh,
muxResultChan: muxResultChan,
}, nil
}

func (w *combinedKubeWatcher) Stop() {
close(w.stopCh)
for _, watcher := range w.watchers {
watcher.Stop()
}
}

func (w *combinedKubeWatcher) ResultChan() <-chan watch.Event {
return w.muxResultChan
}

func TestCombinedWatcher(t *testing.T) {
testCtx := context.Background()

cfg, err := config.GetConfig()
require.NoError(t, err)
scheme := runtime.NewScheme()
require.NoError(t, corev1.AddToScheme(scheme))
kubecli, err := client.NewWithWatch(cfg, client.Options{Scheme: scheme})
require.NoError(t, err)

// Create a temporary namespace
namespaceObj := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-namespace-",
},
}
err = kubecli.Create(testCtx, namespaceObj)
require.NoError(t, err)
namespace := namespaceObj.Name
defer func() {
err := kubecli.Delete(testCtx, namespaceObj)
require.NoError(t, err)
}()

watcher, err := newCombinedKubeWatcher(testCtx, kubecli, namespace, []client.ObjectList{
&corev1.ConfigMapList{},
&corev1.SecretList{},
})
require.NoError(t, err)

var events []watch.Event
go func() {
for {

Check failure on line 101 in test/e2e/combined_watcher_test.go

View workflow job for this annotation

GitHub Actions / Run checks

S1000: should use for range instead of for { select {} } (gosimple)
select {
case msg, ok := <-watcher.ResultChan():
if !ok {
return
}
switch obj := msg.Object.(type) {

Check failure on line 107 in test/e2e/combined_watcher_test.go

View workflow job for this annotation

GitHub Actions / Run checks

singleCaseSwitch: should rewrite switch statement to if statement (gocritic)
case *corev1.ConfigMap:
if obj.Name == "kube-root-ca.crt" {
continue
}
}
events = append(events, msg)
}
}
}()

err = kubecli.Create(testCtx, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "test-cm",
Namespace: namespace,
},
}, &client.CreateOptions{})
require.NoError(t, err)
err = kubecli.Create(testCtx, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test-secret",
Namespace: namespace,
},
}, &client.CreateOptions{})
require.NoError(t, err)

expectedEvents := []watch.Event{
{Type: watch.Added, Object: &corev1.ConfigMap{}},
{Type: watch.Added, Object: &corev1.Secret{}},
}
require.Eventually(t, func() bool {
if len(events) > len(expectedEvents) {
t.Errorf("Got more events than expected already: %s", events)
}
for i, realEvent := range events {
expectedEvent := expectedEvents[i]
realObjectType := reflect.TypeOf(realEvent.Object)
expectedObjectType := reflect.TypeOf(expectedEvent.Object)
if realEvent.Type != expectedEvent.Type || realObjectType != expectedObjectType {
t.Errorf("Event %d did not match expected event: %v != %v", i, realEvent, expectedEvent)
}
}
return len(events) == len(expectedEvents)
}, 3*time.Second, 100*time.Millisecond, "Events never matched expected list, got: %+v", events)
}
38 changes: 22 additions & 16 deletions test/e2e/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,31 @@ import (
"context"

. "github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
"sigs.k8s.io/controller-runtime/pkg/client"

ytv1 "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1"
)

type NamespaceWatcher struct {
kubeWatcher watch.Interface
stopCh chan struct{}
events []WatcherEvent
events []testEvent
}

type WatcherEvent struct {
type testEvent struct {
Kind string
Name string
}

func NewNamespaceWatcher(ctx context.Context, namespace string) *NamespaceWatcher {
watcher, err := k8sClient.Watch(ctx, &corev1.EventList{}, &client.ListOptions{
Namespace: namespace,
watcher, err := newCombinedKubeWatcher(ctx, k8sClient, namespace, []client.ObjectList{
&corev1.EventList{},
&batchv1.JobList{},
&ytv1.YtsaurusList{},
})
Expect(err).ToNot(HaveOccurred())
return &NamespaceWatcher{
Expand All @@ -41,7 +46,7 @@ func (w *NamespaceWatcher) Stop() {
w.kubeWatcher.Stop()
}

func (w *NamespaceWatcher) Events() []WatcherEvent {
func (w *NamespaceWatcher) Events() []testEvent {
return w.events
}

Expand All @@ -51,17 +56,17 @@ func (w *NamespaceWatcher) loop() {
case <-w.stopCh:
return
case ev := <-w.kubeWatcher.ResultChan():
w.handleEvent(ev)
w.handleWatchEvent(ev)
}
}
}

func (w *NamespaceWatcher) handleEvent(ev watch.Event) {
w.maybeLogEvent(ev)
w.maybeStoreEvent(ev)
func (w *NamespaceWatcher) handleWatchEvent(ev watch.Event) {
w.maybeLogWatchEvent(ev)
w.maybeStoreWatchEvent(ev)
}

func (w *NamespaceWatcher) maybeLogEvent(ev watch.Event) {
func (w *NamespaceWatcher) maybeLogWatchEvent(ev watch.Event) {
logEvent := func(event *corev1.Event) {
log.Info("Event",
"type", event.Type,
Expand All @@ -80,14 +85,15 @@ func (w *NamespaceWatcher) maybeLogEvent(ev watch.Event) {
}
}

func (w *NamespaceWatcher) maybeStoreEvent(ev watch.Event) {
func (w *NamespaceWatcher) maybeStoreWatchEvent(ev watch.Event) {
switch ev.Type {
case watch.Added, watch.Modified:
if event, ok := ev.Object.(*corev1.Event); ok {
w.events = append(w.events, WatcherEvent{
Kind: event.InvolvedObject.Kind,
Name: event.InvolvedObject.Name,
})
if obj, ok := ev.Object.(*batchv1.Job); ok {
event := testEvent{
Kind: obj.Kind,
Name: obj.Name,
}
w.events = append(w.events, event)
}
}
}

0 comments on commit 92a80e9

Please sign in to comment.