Skip to content

Commit

Permalink
Refactor namespace events logger
Browse files Browse the repository at this point in the history
  • Loading branch information
l0kix2 committed Jan 13, 2025
1 parent caf6631 commit faa7c88
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 55 deletions.
54 changes: 0 additions & 54 deletions test/e2e/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"

clientgoscheme "k8s.io/client-go/kubernetes/scheme"
clientgoretry "k8s.io/client-go/util/retry"

Expand All @@ -44,8 +42,6 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

corev1 "k8s.io/api/core/v1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"

Expand Down Expand Up @@ -129,56 +125,6 @@ var _ = BeforeEach(func() {
})
})

type NamespaceWatcher struct {
kubeWatcher watch.Interface
stopCh chan struct{}
}

func NewNamespaceWatcher(ctx context.Context, namespace string) *NamespaceWatcher {
watcher, err := k8sClient.Watch(ctx, &corev1.EventList{}, &client.ListOptions{
Namespace: namespace,
})
Expect(err).ToNot(HaveOccurred())
return &NamespaceWatcher{
kubeWatcher: watcher,
}
}

func (w *NamespaceWatcher) Start() {
go w.loop()
}

func (w *NamespaceWatcher) loop() {
logEvent := func(event *corev1.Event) {
log.Info("Event",
"type", event.Type,
"kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name,
"reason", event.Reason,
"message", event.Message,
)
}

for {
select {
case <-w.stopCh:
return
case ev := <-w.kubeWatcher.ResultChan():
switch ev.Type {
case watch.Added, watch.Modified:
if event, ok := ev.Object.(*corev1.Event); ok {
logEvent(event)
}
}
}
}
}

func (w *NamespaceWatcher) Stop() {
close(w.stopCh)
w.kubeWatcher.Stop()
}

func NewYtsaurusStatusTracker() func(*ytv1.Ytsaurus) bool {
prevStatus := ytv1.YtsaurusStatus{}
conditions := map[string]metav1.Condition{}
Expand Down
93 changes: 93 additions & 0 deletions test/e2e/watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package controllers_test

import (
"context"

. "github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type NamespaceWatcher struct {
kubeWatcher watch.Interface
stopCh chan struct{}
events []WatcherEvent
}

type WatcherEvent struct {
Kind string
Name string
}

func NewNamespaceWatcher(ctx context.Context, namespace string) *NamespaceWatcher {
watcher, err := k8sClient.Watch(ctx, &corev1.EventList{}, &client.ListOptions{
Namespace: namespace,
})
Expect(err).ToNot(HaveOccurred())
return &NamespaceWatcher{
kubeWatcher: watcher,
stopCh: make(chan struct{}),
}
}

func (w *NamespaceWatcher) Start() {
go w.loop()
}

func (w *NamespaceWatcher) Stop() {
close(w.stopCh)
w.kubeWatcher.Stop()
}

func (w *NamespaceWatcher) Events() []WatcherEvent {
return w.events
}

func (w *NamespaceWatcher) loop() {
for {
select {
case <-w.stopCh:
return
case ev := <-w.kubeWatcher.ResultChan():
w.handleEvent(ev)
}
}
}

func (w *NamespaceWatcher) handleEvent(ev watch.Event) {
w.maybeLogEvent(ev)
w.maybeStoreEvent(ev)
}

func (w *NamespaceWatcher) maybeLogEvent(ev watch.Event) {
logEvent := func(event *corev1.Event) {
log.Info("Event",
"type", event.Type,
"kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name,
"reason", event.Reason,
"message", event.Message,
)
}

switch ev.Type {
case watch.Added, watch.Modified:
if event, ok := ev.Object.(*corev1.Event); ok {
logEvent(event)
}
}
}

func (w *NamespaceWatcher) maybeStoreEvent(ev watch.Event) {
switch ev.Type {
case watch.Added, watch.Modified:
if event, ok := ev.Object.(*corev1.Event); ok {
w.events = append(w.events, WatcherEvent{
Kind: event.InvolvedObject.Kind,
Name: event.InvolvedObject.Name,
})
}
}
}
1 change: 0 additions & 1 deletion test/e2e/ytsaurus_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ var _ = Describe("Basic e2e test for Ytsaurus controller", Label("e2e"), func()
})

JustBeforeEach(func(ctx context.Context) {

By("Creating resource objects")
for _, object := range objects {
By(fmt.Sprintf("Creating %v %v", GetObjectGVK(object), object.GetName()))
Expand Down

0 comments on commit faa7c88

Please sign in to comment.