From 57a6b3abca7ec1d024d56e144d870583a1c61546 Mon Sep 17 00:00:00 2001 From: Aya Igarashi Date: Thu, 23 Jan 2025 10:34:18 +0900 Subject: [PATCH 1/3] feat(syncer): add job controller --- deployments/syncer/templates/clusterrole.yaml | 20 ++ provision.mk | 2 + syncer/cmd/run.go | 20 ++ syncer/internal/controller/job.go | 203 ++++++++++++++++++ syncer/internal/controller/job_test.go | 166 ++++++++++++++ 5 files changed, 411 insertions(+) create mode 100644 syncer/internal/controller/job.go create mode 100644 syncer/internal/controller/job_test.go diff --git a/deployments/syncer/templates/clusterrole.yaml b/deployments/syncer/templates/clusterrole.yaml index 8a8760e..020b83e 100644 --- a/deployments/syncer/templates/clusterrole.yaml +++ b/deployments/syncer/templates/clusterrole.yaml @@ -17,3 +17,23 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - events + verbs: + - create + - patch + - update +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - get + - list + - watch + - update + - patch + - delete diff --git a/provision.mk b/provision.mk index 575c8e8..3d06a40 100644 --- a/provision.mk +++ b/provision.mk @@ -13,6 +13,8 @@ provision-all: pull-llma-chart configure-llma-chart create-kind-cluster helm-app reapply-job-server: load-server-image helm-apply-cp-llma rollout-job-server .PHONY: reapply-job-dispatcher reapply-job-dispatcher: load-dispatcher-image helm-apply-cp-llma rollout-job-dispatcher +.PHONY: reapply-job-syncer +reapply-job-syncer: load-syncer-image helm-apply-cp-llma rollout-job-syncer # ------------------------------------------------------------------------------ # chart repository diff --git a/syncer/cmd/run.go b/syncer/cmd/run.go index 3008988..4ddf2cb 100644 --- a/syncer/cmd/run.go +++ b/syncer/cmd/run.go @@ -6,9 +6,14 @@ import ( "log" "github.com/go-logr/stdr" + v1 "github.com/llmariner/job-manager/api/v1" "github.com/llmariner/job-manager/syncer/internal/config" + "github.com/llmariner/job-manager/syncer/internal/controller" "github.com/spf13/cobra" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/healthz" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" ) @@ -41,6 +46,12 @@ func run(ctx context.Context, c *config.Config) error { ctx = ctrl.LoggerInto(ctx, log) ctrl.SetLogger(logger) + conn, err := grpc.NewClient(c.JobManagerServerSyncerServiceAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return fmt.Errorf("failed to create grpc client: %s", err) + } + ssc := v1.NewSyncerServiceClient(conn) + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ LeaderElection: c.KubernetesManager.EnableLeaderElection, LeaderElectionID: c.KubernetesManager.LeaderElectionID, @@ -53,5 +64,14 @@ func run(ctx context.Context, c *config.Config) error { if err != nil { return fmt.Errorf("create manager: %s", err) } + + if err := (&controller.JobController{}).SetupWithManager(mgr, ssc); err != nil { + return fmt.Errorf("setup job controller: %s", err) + } + + if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { + return err + } + return mgr.Start(ctx) } diff --git a/syncer/internal/controller/job.go b/syncer/internal/controller/job.go new file mode 100644 index 0000000..a587f1e --- /dev/null +++ b/syncer/internal/controller/job.go @@ -0,0 +1,203 @@ +package controller + +import ( + "context" + "fmt" + "time" + + v1 "github.com/llmariner/job-manager/api/v1" + batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/record" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +const ( + controllerName = "cloudnatix.com/job-controller" + + annoKeyClusterID = "cloudnatix.com/cluster-id" + annoKeyUID = "cloudnatix.com/uid" + annoKeyDeployedAt = "cloudnatix.com/deployed-at" +) + +var excludeLabelKeys = map[string]struct{}{ + "batch.kubernetes.io/controller-uid": {}, + "batch.kubernetes.io/job-name": {}, + "controller-uid": {}, + "job-name": {}, +} + +var jobGVR = schema.GroupVersionResource{ + Group: "batch", + Version: "v1", + Resource: "jobs", +} + +// JobController reconciles a Job object +type JobController struct { + recorder record.EventRecorder + k8sClient client.Client + ssClient v1.SyncerServiceClient +} + +// SetupWithManager sets up the controller with the Manager. +func (c *JobController) SetupWithManager(mgr ctrl.Manager, ssClient v1.SyncerServiceClient) error { + c.recorder = mgr.GetEventRecorderFor(controllerName) + c.k8sClient = mgr.GetClient() + c.ssClient = ssClient + return ctrl.NewControllerManagedBy(mgr). + For(&batchv1.Job{}). + Complete(c) +} + +// Reconcile reconciles a local Job object and deploy it to the worker cluster. +func (c *JobController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := ctrl.LoggerFrom(ctx) + + var job batchv1.Job + if err := c.k8sClient.Get(ctx, req.NamespacedName, &job); err != nil { + err = client.IgnoreNotFound(err) + if err != nil { + log.Error(err, "Failed to get job") + } + return ctrl.Result{}, err + } + + if mgr := ptr.Deref(job.Spec.ManagedBy, ""); mgr != controllerName { + log.V(4).Info("Skip job", "managedBy", mgr) + return ctrl.Result{}, nil + } + + if !job.DeletionTimestamp.IsZero() { + if !controllerutil.ContainsFinalizer(&job, controllerName) { + return ctrl.Result{}, nil + } + + clusterID := job.Annotations[annoKeyClusterID] + if clusterID != "" { + if _, err := c.ssClient.DeleteKubernetesObject(ctx, &v1.DeleteKubernetesObjectRequest{ + ClusterId: clusterID, + Namespace: req.Namespace, + Name: req.Name, + Group: jobGVR.Group, + Version: jobGVR.Version, + Resource: jobGVR.Resource, + }); err != nil { + log.Error(err, "Failed to delete job") + return ctrl.Result{}, err + } + } else { + log.V(1).Info("Cluster ID not found, this job might not be deployed") + } + + controllerutil.RemoveFinalizer(&job, controllerName) + if err := c.k8sClient.Update(ctx, &job); err != nil { + log.Error(err, "Failed to remove finalizer") + return ctrl.Result{}, client.IgnoreNotFound(err) + } + log.Info("Job finalizer is removed") + return ctrl.Result{}, nil + } + + if !controllerutil.ContainsFinalizer(&job, controllerName) { + controllerutil.AddFinalizer(&job, controllerName) + if err := c.k8sClient.Update(ctx, &job); err != nil { + log.Error(err, "add finalizer") + return ctrl.Result{}, client.IgnoreNotFound(err) + } + } + + if v := job.Annotations[annoKeyDeployedAt]; v != "" { + log.V(1).Info("Job is already deployed") + return ctrl.Result{}, nil + } + + deployObj := job.DeepCopy() + deployObj.ObjectMeta = metav1.ObjectMeta{ + Name: job.Name, + Namespace: job.Namespace, + Labels: job.Labels, + } + for k := range deployObj.Labels { + if _, ok := excludeLabelKeys[k]; ok { + delete(deployObj.Labels, k) + } + } + deployObj.Spec.ManagedBy = nil + if deployObj.Spec.Selector != nil { + for k := range deployObj.Spec.Selector.MatchLabels { + if _, ok := excludeLabelKeys[k]; ok { + delete(deployObj.Spec.Selector.MatchLabels, k) + } + } + } + for k := range deployObj.Spec.Template.Labels { + if _, ok := excludeLabelKeys[k]; ok { + delete(deployObj.Spec.Template.Labels, k) + } + } + + obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&deployObj) + if err != nil { + log.Error(err, "Failed to convert job to unstructured") + return ctrl.Result{}, err + } + uobj := &unstructured.Unstructured{Object: obj} + data, err := uobj.MarshalJSON() + if err != nil { + log.Error(err, "Failed to marshal job") + return ctrl.Result{}, err + } + + patchReq := &v1.PatchKubernetesObjectRequest{ + Namespace: job.Namespace, + Name: job.Name, + Group: jobGVR.Group, + Version: jobGVR.Version, + Resource: jobGVR.Resource, + Data: data, + } + var totalGPUs int + for _, container := range job.Spec.Template.Spec.Containers { + if container.Resources.Limits != nil { + if gpu, ok := container.Resources.Limits["nvidia.com/gpu"]; ok { + totalGPUs += int(gpu.Value()) + } + } + } + if totalGPUs > 0 { + patchReq.Resources = &v1.PatchKubernetesObjectRequest_Resources{ + GpuLimit: int32(totalGPUs), + } + } + + resp, err := c.ssClient.PatchKubernetesObject(ctx, patchReq) + if err != nil { + log.Error(err, "Failed to patch job", "data", string(data)) + return ctrl.Result{}, err + } + log.V(2).Info("Patched job", "response", resp) + + patch := client.MergeFrom(&job) + newJob := job.DeepCopy() + if newJob.Annotations == nil { + newJob.Annotations = make(map[string]string) + } + newJob.Annotations[annoKeyClusterID] = resp.ClusterId + newJob.Annotations[annoKeyUID] = resp.Uid + newJob.Annotations[annoKeyDeployedAt] = metav1.Now().UTC().Format(time.RFC3339) + if err := c.k8sClient.Patch(ctx, newJob, patch); err != nil { + log.Error(err, "Failed to update job") + return ctrl.Result{}, err + } + + c.recorder.Event(&job, "Normal", "Deployed", fmt.Sprintf("Job(%s) is deployed to the Cluster(%s)", resp.Uid, resp.ClusterId)) + log.Info("Deployed job") + return ctrl.Result{}, nil +} diff --git a/syncer/internal/controller/job_test.go b/syncer/internal/controller/job_test.go new file mode 100644 index 0000000..7ba0745 --- /dev/null +++ b/syncer/internal/controller/job_test.go @@ -0,0 +1,166 @@ +package controller + +import ( + "context" + "testing" + "time" + + "github.com/go-logr/logr/testr" + v1 "github.com/llmariner/job-manager/api/v1" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func TestReconcileJob(t *testing.T) { + req := ctrl.Request{ + NamespacedName: types.NamespacedName{ + Namespace: "default", + Name: "test", + }, + } + + createJob := func(mutateFn func(job *batchv1.Job)) *batchv1.Job { + labels := map[string]string{ + "batch.kubernetes.io/controller-uid": "uid", + "batch.kubernetes.io/job-name": "job", + "controller-uid": "uid", + "job-name": "test", + "custom": "test", + } + job := batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: req.Name, + Namespace: req.Namespace, + Labels: labels, + }, + Spec: batchv1.JobSpec{ + ManagedBy: ptr.To(controllerName), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "job-name": "test", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{ + { + Name: "hello", + Image: "busybox", + Args: []string{"echo", "test"}, + }, + }, + }, + }, + }, + } + if mutateFn != nil { + mutateFn(&job) + } + return &job + } + + var tests = []struct { + name string + job *batchv1.Job + + wantPatch bool + wantDelete bool + assertFn func(t *testing.T, job batchv1.Job) + }{ + { + name: "deploy", + job: createJob(nil), + assertFn: func(t *testing.T, job batchv1.Job) { + assert.Contains(t, job.Annotations, annoKeyClusterID) + assert.Contains(t, job.Annotations, annoKeyDeployedAt) + assert.Contains(t, job.Annotations, annoKeyUID) + assert.Contains(t, job.Finalizers, controllerName) + }, + wantPatch: true, + }, + { + name: "no change", + job: createJob(func(job *batchv1.Job) { + job.Finalizers = append(job.Finalizers, controllerName) + job.Annotations = map[string]string{annoKeyDeployedAt: metav1.Now().Format(time.RFC3339)} + }), + }, + { + name: "finalize", + job: createJob(func(job *batchv1.Job) { + job.DeletionTimestamp = ptr.To(metav1.Now()) + job.Annotations = map[string]string{annoKeyClusterID: "cid"} + job.Finalizers = append(job.Finalizers, controllerName) + }), + wantDelete: true, + }, + { + name: "already deleted", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var objs []runtime.Object + if test.job != nil { + objs = append(objs, test.job) + } + ssClient := &fakeSyncerServiceClient{} + jobCtr := JobController{ + recorder: record.NewFakeRecorder(5), + k8sClient: fake.NewFakeClient(objs...), + ssClient: ssClient, + } + + ctx := context.Background() + ctx = ctrl.LoggerInto(ctx, testr.NewWithOptions(t, testr.Options{Verbosity: 8})) + + _, err := jobCtr.Reconcile(ctx, req) + assert.NoError(t, err) + + if test.wantPatch { + assert.Equal(t, 1, ssClient.patchCount) + } + if test.wantDelete { + assert.Equal(t, 1, ssClient.delCount) + } + + if test.assertFn != nil { + var gotJob batchv1.Job + err = jobCtr.k8sClient.Get(ctx, req.NamespacedName, &gotJob) + assert.NoError(t, err) + test.assertFn(t, gotJob) + } + }) + } +} + +type fakeSyncerServiceClient struct { + patchCount int + delCount int +} + +func (s *fakeSyncerServiceClient) PatchKubernetesObject(ctx context.Context, in *v1.PatchKubernetesObjectRequest, opts ...grpc.CallOption) (*v1.PatchKubernetesObjectResponse, error) { + s.patchCount++ + return &v1.PatchKubernetesObjectResponse{ + ClusterId: "cid", + Uid: "uid", + }, nil +} + +func (s *fakeSyncerServiceClient) DeleteKubernetesObject(ctx context.Context, in *v1.DeleteKubernetesObjectRequest, opts ...grpc.CallOption) (*v1.DeleteKubernetesObjectResponse, error) { + s.delCount++ + return &v1.DeleteKubernetesObjectResponse{}, nil +} From 64a21b7e34c6543729968e517e8d7a8188ab8232 Mon Sep 17 00:00:00 2001 From: "Aya (Igarashi) Ozawa" Date: Thu, 23 Jan 2025 13:43:22 +0900 Subject: [PATCH 2/3] Update syncer/internal/controller/job.go Co-authored-by: Kenji Kaneda --- syncer/internal/controller/job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/syncer/internal/controller/job.go b/syncer/internal/controller/job.go index a587f1e..d062aee 100644 --- a/syncer/internal/controller/job.go +++ b/syncer/internal/controller/job.go @@ -56,7 +56,7 @@ func (c *JobController) SetupWithManager(mgr ctrl.Manager, ssClient v1.SyncerSer Complete(c) } -// Reconcile reconciles a local Job object and deploy it to the worker cluster. +// Reconcile reconciles a local Job object and deploys it to the worker cluster. func (c *JobController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := ctrl.LoggerFrom(ctx) From ff1e0cc3c0b8aec23b436f5d41e4479e7bc32eaf Mon Sep 17 00:00:00 2001 From: Aya Igarashi Date: Thu, 23 Jan 2025 13:53:02 +0900 Subject: [PATCH 3/3] fixup! feat(syncer): add job controller --- syncer/internal/controller/job.go | 23 +++++++++++++---------- syncer/internal/controller/job_test.go | 8 ++++---- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/syncer/internal/controller/job.go b/syncer/internal/controller/job.go index d062aee..98554a8 100644 --- a/syncer/internal/controller/job.go +++ b/syncer/internal/controller/job.go @@ -19,11 +19,13 @@ import ( ) const ( - controllerName = "cloudnatix.com/job-controller" + domain = "cloudnatix.com" + controllerName = "job-controller" + fullControllerName = domain + "/" + controllerName - annoKeyClusterID = "cloudnatix.com/cluster-id" - annoKeyUID = "cloudnatix.com/uid" - annoKeyDeployedAt = "cloudnatix.com/deployed-at" + annoKeyClusterID = domain + "/cluster-id" + annoKeyUID = domain + "/uid" + annoKeyDeployedAt = domain + "/deployed-at" ) var excludeLabelKeys = map[string]struct{}{ @@ -48,10 +50,11 @@ type JobController struct { // SetupWithManager sets up the controller with the Manager. func (c *JobController) SetupWithManager(mgr ctrl.Manager, ssClient v1.SyncerServiceClient) error { - c.recorder = mgr.GetEventRecorderFor(controllerName) + c.recorder = mgr.GetEventRecorderFor(fullControllerName) c.k8sClient = mgr.GetClient() c.ssClient = ssClient return ctrl.NewControllerManagedBy(mgr). + Named(controllerName). For(&batchv1.Job{}). Complete(c) } @@ -69,13 +72,13 @@ func (c *JobController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R return ctrl.Result{}, err } - if mgr := ptr.Deref(job.Spec.ManagedBy, ""); mgr != controllerName { + if mgr := ptr.Deref(job.Spec.ManagedBy, ""); mgr != fullControllerName { log.V(4).Info("Skip job", "managedBy", mgr) return ctrl.Result{}, nil } if !job.DeletionTimestamp.IsZero() { - if !controllerutil.ContainsFinalizer(&job, controllerName) { + if !controllerutil.ContainsFinalizer(&job, fullControllerName) { return ctrl.Result{}, nil } @@ -96,7 +99,7 @@ func (c *JobController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R log.V(1).Info("Cluster ID not found, this job might not be deployed") } - controllerutil.RemoveFinalizer(&job, controllerName) + controllerutil.RemoveFinalizer(&job, fullControllerName) if err := c.k8sClient.Update(ctx, &job); err != nil { log.Error(err, "Failed to remove finalizer") return ctrl.Result{}, client.IgnoreNotFound(err) @@ -105,8 +108,8 @@ func (c *JobController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R return ctrl.Result{}, nil } - if !controllerutil.ContainsFinalizer(&job, controllerName) { - controllerutil.AddFinalizer(&job, controllerName) + if !controllerutil.ContainsFinalizer(&job, fullControllerName) { + controllerutil.AddFinalizer(&job, fullControllerName) if err := c.k8sClient.Update(ctx, &job); err != nil { log.Error(err, "add finalizer") return ctrl.Result{}, client.IgnoreNotFound(err) diff --git a/syncer/internal/controller/job_test.go b/syncer/internal/controller/job_test.go index 7ba0745..8de9031 100644 --- a/syncer/internal/controller/job_test.go +++ b/syncer/internal/controller/job_test.go @@ -43,7 +43,7 @@ func TestReconcileJob(t *testing.T) { Labels: labels, }, Spec: batchv1.JobSpec{ - ManagedBy: ptr.To(controllerName), + ManagedBy: ptr.To(fullControllerName), Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "job-name": "test", @@ -87,14 +87,14 @@ func TestReconcileJob(t *testing.T) { assert.Contains(t, job.Annotations, annoKeyClusterID) assert.Contains(t, job.Annotations, annoKeyDeployedAt) assert.Contains(t, job.Annotations, annoKeyUID) - assert.Contains(t, job.Finalizers, controllerName) + assert.Contains(t, job.Finalizers, fullControllerName) }, wantPatch: true, }, { name: "no change", job: createJob(func(job *batchv1.Job) { - job.Finalizers = append(job.Finalizers, controllerName) + job.Finalizers = append(job.Finalizers, fullControllerName) job.Annotations = map[string]string{annoKeyDeployedAt: metav1.Now().Format(time.RFC3339)} }), }, @@ -103,7 +103,7 @@ func TestReconcileJob(t *testing.T) { job: createJob(func(job *batchv1.Job) { job.DeletionTimestamp = ptr.To(metav1.Now()) job.Annotations = map[string]string{annoKeyClusterID: "cid"} - job.Finalizers = append(job.Finalizers, controllerName) + job.Finalizers = append(job.Finalizers, fullControllerName) }), wantDelete: true, },