Skip to content

Commit

Permalink
Add check for jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
l0kix2 committed Jan 13, 2025
1 parent d907079 commit 1a90e4f
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 50 deletions.
10 changes: 10 additions & 0 deletions test/e2e/checks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package controllers_test

func getInitializingStageJobNames() []string {
return []string{
"yt-master-init-job-default",
"yt-client-init-job-user",
"yt-scheduler-init-job-user",
"yt-scheduler-init-job-op-archive",
}
}
34 changes: 34 additions & 0 deletions test/e2e/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"

clientgoscheme "k8s.io/client-go/kubernetes/scheme"
clientgoretry "k8s.io/client-go/util/retry"

Expand All @@ -42,6 +44,8 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

corev1 "k8s.io/api/core/v1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"

Expand Down Expand Up @@ -125,6 +129,36 @@ var _ = BeforeEach(func() {
})
})

func LogObjectEvents(ctx context.Context, namespace string) func() {
watcher, err := k8sClient.Watch(ctx, &corev1.EventList{}, &client.ListOptions{
Namespace: namespace,
})
Expect(err).ToNot(HaveOccurred())

logEvent := func(event *corev1.Event) {
log.Info("Event",
"type", event.Type,
"kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name,
"reason", event.Reason,
"message", event.Message,
)
}

go func() {
for ev := range watcher.ResultChan() {
switch ev.Type {
case watch.Added, watch.Modified:
if event, ok := ev.Object.(*corev1.Event); ok {
logEvent(event)
}
}
}
}()

return watcher.Stop
}

func NewYtsaurusStatusTracker() func(*ytv1.Ytsaurus) bool {
prevStatus := ytv1.YtsaurusStatus{}
conditions := map[string]metav1.Condition{}
Expand Down
67 changes: 18 additions & 49 deletions test/e2e/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,22 @@ import (
. "github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
"sigs.k8s.io/controller-runtime/pkg/client"

ytv1 "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1"
"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/testutil"
)

type NamespaceWatcher struct {
kubeWatcher watch.Interface
stopCh chan struct{}
events []testEvent
}

type testEvent struct {
Kind string
Name string
events []watch.Event
}

func NewNamespaceWatcher(ctx context.Context, namespace string) *NamespaceWatcher {
watcher, err := testutil.NewCombinedKubeWatcher(ctx, k8sClient, namespace, []client.ObjectList{
&corev1.EventList{},
&batchv1.JobList{},
&ytv1.YtsaurusList{},
//&ytv1.YtsaurusList{},

Check failure on line 24 in test/e2e/watcher_test.go

View workflow job for this annotation

GitHub Actions / Run checks

commentFormatting: put a space between `//` and comment text (gocritic)
})
Expect(err).ToNot(HaveOccurred())
return &NamespaceWatcher{
Expand All @@ -47,54 +39,31 @@ func (w *NamespaceWatcher) Stop() {
w.kubeWatcher.Stop()
}

func (w *NamespaceWatcher) Events() []testEvent {
func (w *NamespaceWatcher) GetRawEvents() []watch.Event {
return w.events
}

// TODO: not really generic, but good enough for the start.
func (w *NamespaceWatcher) GetCompletedJobNames() []string {
var result []string
for _, ev := range w.events {
if job, ok := ev.Object.(*batchv1.Job); ok {
if job.Status.Succeeded == 0 {
continue
}
result = append(result, job.Name)
}
}
return result
}

func (w *NamespaceWatcher) loop() {
for {
select {
case <-w.stopCh:
return
case ev := <-w.kubeWatcher.ResultChan():
w.handleWatchEvent(ev)
}
}
}

func (w *NamespaceWatcher) handleWatchEvent(ev watch.Event) {
w.maybeLogWatchEvent(ev)
w.maybeStoreWatchEvent(ev)
}

func (w *NamespaceWatcher) maybeLogWatchEvent(ev watch.Event) {
logEvent := func(event *corev1.Event) {
log.Info("Event",
"type", event.Type,
"kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name,
"reason", event.Reason,
"message", event.Message,
)
}

switch ev.Type {
case watch.Added, watch.Modified:
if event, ok := ev.Object.(*corev1.Event); ok {
logEvent(event)
}
}
}

func (w *NamespaceWatcher) maybeStoreWatchEvent(ev watch.Event) {
switch ev.Type {
case watch.Added, watch.Modified:
if obj, ok := ev.Object.(*batchv1.Job); ok {
event := testEvent{
Kind: obj.Kind,
Name: obj.Name,
}
w.events = append(w.events, event)
w.events = append(w.events, ev)
}
}
}
10 changes: 9 additions & 1 deletion test/e2e/ytsaurus_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,11 @@ var _ = Describe("Basic e2e test for Ytsaurus controller", Label("e2e"), func()
}

By("Logging all events in namespace")
DeferCleanup(namespaceWatcher.Stop)
logEventsCleanup := LogObjectEvents(ctx, namespace)
DeferCleanup(func() {
logEventsCleanup()
namespaceWatcher.Stop()
})
})

JustBeforeEach(func(ctx context.Context) {
Expand All @@ -229,6 +233,10 @@ var _ = Describe("Basic e2e test for Ytsaurus controller", Label("e2e"), func()
By("Checking that Ytsaurus state is equal to `Running`")
EventuallyYtsaurus(ctx, ytsaurus, bootstrapTimeout).Should(HaveClusterStateRunning())

By("Checking jobs order")
completedJobs := namespaceWatcher.GetCompletedJobNames()
Expect(completedJobs).Should(Equal(getInitializingStageJobNames()))

g = ytconfig.NewGenerator(ytsaurus, "local")

ytClient = createYtsaurusClient(ytsaurus, namespace)
Expand Down

0 comments on commit 1a90e4f

Please sign in to comment.