diff --git a/test/e2e/checks_test.go b/test/e2e/checks_test.go new file mode 100644 index 00000000..63e7f00c --- /dev/null +++ b/test/e2e/checks_test.go @@ -0,0 +1,10 @@ +package controllers_test + +func getInitializingStageJobNames() []string { + return []string{ + "yt-master-init-job-default", + "yt-client-init-job-user", + "yt-scheduler-init-job-user", + "yt-scheduler-init-job-op-archive", + } +} diff --git a/test/e2e/suite_test.go b/test/e2e/suite_test.go index a03c0ccd..974fa70b 100644 --- a/test/e2e/suite_test.go +++ b/test/e2e/suite_test.go @@ -33,6 +33,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" clientgoretry "k8s.io/client-go/util/retry" @@ -42,6 +44,8 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -125,6 +129,36 @@ var _ = BeforeEach(func() { }) }) +func LogObjectEvents(ctx context.Context, namespace string) func() { + watcher, err := k8sClient.Watch(ctx, &corev1.EventList{}, &client.ListOptions{ + Namespace: namespace, + }) + Expect(err).ToNot(HaveOccurred()) + + logEvent := func(event *corev1.Event) { + log.Info("Event", + "type", event.Type, + "kind", event.InvolvedObject.Kind, + "name", event.InvolvedObject.Name, + "reason", event.Reason, + "message", event.Message, + ) + } + + go func() { + for ev := range watcher.ResultChan() { + switch ev.Type { + case watch.Added, watch.Modified: + if event, ok := ev.Object.(*corev1.Event); ok { + logEvent(event) + } + } + } + }() + + return watcher.Stop +} + func NewYtsaurusStatusTracker() func(*ytv1.Ytsaurus) bool { prevStatus := ytv1.YtsaurusStatus{} conditions := map[string]metav1.Condition{} diff --git a/test/e2e/watcher_test.go b/test/e2e/watcher_test.go index 2645bb39..d1bfffe9 100644 --- a/test/e2e/watcher_test.go +++ b/test/e2e/watcher_test.go @@ -6,30 +6,22 @@ import ( . "github.com/onsi/gomega" batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/watch" "sigs.k8s.io/controller-runtime/pkg/client" - ytv1 "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1" "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/testutil" ) type NamespaceWatcher struct { kubeWatcher watch.Interface stopCh chan struct{} - events []testEvent -} - -type testEvent struct { - Kind string - Name string + events []watch.Event } func NewNamespaceWatcher(ctx context.Context, namespace string) *NamespaceWatcher { watcher, err := testutil.NewCombinedKubeWatcher(ctx, k8sClient, namespace, []client.ObjectList{ - &corev1.EventList{}, &batchv1.JobList{}, - &ytv1.YtsaurusList{}, + //&ytv1.YtsaurusList{}, }) Expect(err).ToNot(HaveOccurred()) return &NamespaceWatcher{ @@ -47,54 +39,31 @@ func (w *NamespaceWatcher) Stop() { w.kubeWatcher.Stop() } -func (w *NamespaceWatcher) Events() []testEvent { +func (w *NamespaceWatcher) GetRawEvents() []watch.Event { return w.events } +// TODO: not really generic, but good enough for the start. +func (w *NamespaceWatcher) GetCompletedJobNames() []string { + var result []string + for _, ev := range w.events { + if job, ok := ev.Object.(*batchv1.Job); ok { + if job.Status.Succeeded == 0 { + continue + } + result = append(result, job.Name) + } + } + return result +} + func (w *NamespaceWatcher) loop() { for { select { case <-w.stopCh: return case ev := <-w.kubeWatcher.ResultChan(): - w.handleWatchEvent(ev) - } - } -} - -func (w *NamespaceWatcher) handleWatchEvent(ev watch.Event) { - w.maybeLogWatchEvent(ev) - w.maybeStoreWatchEvent(ev) -} - -func (w *NamespaceWatcher) maybeLogWatchEvent(ev watch.Event) { - logEvent := func(event *corev1.Event) { - log.Info("Event", - "type", event.Type, - "kind", event.InvolvedObject.Kind, - "name", event.InvolvedObject.Name, - "reason", event.Reason, - "message", event.Message, - ) - } - - switch ev.Type { - case watch.Added, watch.Modified: - if event, ok := ev.Object.(*corev1.Event); ok { - logEvent(event) - } - } -} - -func (w *NamespaceWatcher) maybeStoreWatchEvent(ev watch.Event) { - switch ev.Type { - case watch.Added, watch.Modified: - if obj, ok := ev.Object.(*batchv1.Job); ok { - event := testEvent{ - Kind: obj.Kind, - Name: obj.Name, - } - w.events = append(w.events, event) + w.events = append(w.events, ev) } } } diff --git a/test/e2e/ytsaurus_controller_test.go b/test/e2e/ytsaurus_controller_test.go index 0cc5d71d..73d32242 100644 --- a/test/e2e/ytsaurus_controller_test.go +++ b/test/e2e/ytsaurus_controller_test.go @@ -215,7 +215,11 @@ var _ = Describe("Basic e2e test for Ytsaurus controller", Label("e2e"), func() } By("Logging all events in namespace") - DeferCleanup(namespaceWatcher.Stop) + logEventsCleanup := LogObjectEvents(ctx, namespace) + DeferCleanup(func() { + logEventsCleanup() + namespaceWatcher.Stop() + }) }) JustBeforeEach(func(ctx context.Context) { @@ -229,6 +233,10 @@ var _ = Describe("Basic e2e test for Ytsaurus controller", Label("e2e"), func() By("Checking that Ytsaurus state is equal to `Running`") EventuallyYtsaurus(ctx, ytsaurus, bootstrapTimeout).Should(HaveClusterStateRunning()) + By("Checking jobs order") + completedJobs := namespaceWatcher.GetCompletedJobNames() + Expect(completedJobs).Should(Equal(getInitializingStageJobNames())) + g = ytconfig.NewGenerator(ytsaurus, "local") ytClient = createYtsaurusClient(ytsaurus, namespace)