Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(syncer): add job controller #347

Merged
merged 3 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions deployments/syncer/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,23 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- update
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- create
- get
- list
- watch
- update
- patch
- delete
2 changes: 2 additions & 0 deletions provision.mk
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ provision-all: pull-llma-chart configure-llma-chart create-kind-cluster helm-app
reapply-job-server: load-server-image helm-apply-cp-llma rollout-job-server
.PHONY: reapply-job-dispatcher
reapply-job-dispatcher: load-dispatcher-image helm-apply-cp-llma rollout-job-dispatcher
.PHONY: reapply-job-syncer
reapply-job-syncer: load-syncer-image helm-apply-cp-llma rollout-job-syncer

# ------------------------------------------------------------------------------
# chart repository
Expand Down
20 changes: 20 additions & 0 deletions syncer/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ import (
"log"

"github.com/go-logr/stdr"
v1 "github.com/llmariner/job-manager/api/v1"
"github.com/llmariner/job-manager/syncer/internal/config"
"github.com/llmariner/job-manager/syncer/internal/controller"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

Expand Down Expand Up @@ -41,6 +46,12 @@ func run(ctx context.Context, c *config.Config) error {
ctx = ctrl.LoggerInto(ctx, log)
ctrl.SetLogger(logger)

conn, err := grpc.NewClient(c.JobManagerServerSyncerServiceAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("failed to create grpc client: %s", err)
}
ssc := v1.NewSyncerServiceClient(conn)

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
LeaderElection: c.KubernetesManager.EnableLeaderElection,
LeaderElectionID: c.KubernetesManager.LeaderElectionID,
Expand All @@ -53,5 +64,14 @@ func run(ctx context.Context, c *config.Config) error {
if err != nil {
return fmt.Errorf("create manager: %s", err)
}

if err := (&controller.JobController{}).SetupWithManager(mgr, ssc); err != nil {
return fmt.Errorf("setup job controller: %s", err)
}

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
return err
}

return mgr.Start(ctx)
}
203 changes: 203 additions & 0 deletions syncer/internal/controller/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package controller

import (
"context"
"fmt"
"time"

v1 "github.com/llmariner/job-manager/api/v1"
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

const (
controllerName = "cloudnatix.com/job-controller"

annoKeyClusterID = "cloudnatix.com/cluster-id"
annoKeyUID = "cloudnatix.com/uid"
annoKeyDeployedAt = "cloudnatix.com/deployed-at"
)

var excludeLabelKeys = map[string]struct{}{
"batch.kubernetes.io/controller-uid": {},
"batch.kubernetes.io/job-name": {},
"controller-uid": {},
"job-name": {},
}

var jobGVR = schema.GroupVersionResource{
Group: "batch",
Version: "v1",
Resource: "jobs",
}

// JobController reconciles a Job object
type JobController struct {
recorder record.EventRecorder
k8sClient client.Client
ssClient v1.SyncerServiceClient
}

// SetupWithManager sets up the controller with the Manager.
func (c *JobController) SetupWithManager(mgr ctrl.Manager, ssClient v1.SyncerServiceClient) error {
c.recorder = mgr.GetEventRecorderFor(controllerName)
c.k8sClient = mgr.GetClient()
c.ssClient = ssClient
return ctrl.NewControllerManagedBy(mgr).
For(&batchv1.Job{}).
Complete(c)
}

// Reconcile reconciles a local Job object and deploy it to the worker cluster.
func (c *JobController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

var job batchv1.Job
if err := c.k8sClient.Get(ctx, req.NamespacedName, &job); err != nil {
err = client.IgnoreNotFound(err)
if err != nil {
log.Error(err, "Failed to get job")
}
return ctrl.Result{}, err
}

if mgr := ptr.Deref(job.Spec.ManagedBy, ""); mgr != controllerName {
log.V(4).Info("Skip job", "managedBy", mgr)
return ctrl.Result{}, nil
}

if !job.DeletionTimestamp.IsZero() {
if !controllerutil.ContainsFinalizer(&job, controllerName) {
return ctrl.Result{}, nil
}

clusterID := job.Annotations[annoKeyClusterID]
if clusterID != "" {
if _, err := c.ssClient.DeleteKubernetesObject(ctx, &v1.DeleteKubernetesObjectRequest{
ClusterId: clusterID,
Namespace: req.Namespace,
Name: req.Name,
Group: jobGVR.Group,
Version: jobGVR.Version,
Resource: jobGVR.Resource,
}); err != nil {
log.Error(err, "Failed to delete job")
return ctrl.Result{}, err
}
} else {
log.V(1).Info("Cluster ID not found, this job might not be deployed")
}

controllerutil.RemoveFinalizer(&job, controllerName)
if err := c.k8sClient.Update(ctx, &job); err != nil {
log.Error(err, "Failed to remove finalizer")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
log.Info("Job finalizer is removed")
return ctrl.Result{}, nil
}

if !controllerutil.ContainsFinalizer(&job, controllerName) {
controllerutil.AddFinalizer(&job, controllerName)
if err := c.k8sClient.Update(ctx, &job); err != nil {
log.Error(err, "add finalizer")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}

if v := job.Annotations[annoKeyDeployedAt]; v != "" {
log.V(1).Info("Job is already deployed")
return ctrl.Result{}, nil
}

deployObj := job.DeepCopy()
deployObj.ObjectMeta = metav1.ObjectMeta{
Name: job.Name,
Namespace: job.Namespace,
Labels: job.Labels,
}
for k := range deployObj.Labels {
if _, ok := excludeLabelKeys[k]; ok {
delete(deployObj.Labels, k)
}
}
deployObj.Spec.ManagedBy = nil
if deployObj.Spec.Selector != nil {
for k := range deployObj.Spec.Selector.MatchLabels {
if _, ok := excludeLabelKeys[k]; ok {
delete(deployObj.Spec.Selector.MatchLabels, k)
}
}
}
for k := range deployObj.Spec.Template.Labels {
if _, ok := excludeLabelKeys[k]; ok {
delete(deployObj.Spec.Template.Labels, k)
}
}

obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&deployObj)
if err != nil {
log.Error(err, "Failed to convert job to unstructured")
return ctrl.Result{}, err
}
uobj := &unstructured.Unstructured{Object: obj}
data, err := uobj.MarshalJSON()
if err != nil {
log.Error(err, "Failed to marshal job")
return ctrl.Result{}, err
}

patchReq := &v1.PatchKubernetesObjectRequest{
Namespace: job.Namespace,
Name: job.Name,
Group: jobGVR.Group,
Version: jobGVR.Version,
Resource: jobGVR.Resource,
Data: data,
}
var totalGPUs int
for _, container := range job.Spec.Template.Spec.Containers {
if container.Resources.Limits != nil {
if gpu, ok := container.Resources.Limits["nvidia.com/gpu"]; ok {
totalGPUs += int(gpu.Value())
}
}
}
if totalGPUs > 0 {
patchReq.Resources = &v1.PatchKubernetesObjectRequest_Resources{
GpuLimit: int32(totalGPUs),
}
}

resp, err := c.ssClient.PatchKubernetesObject(ctx, patchReq)
if err != nil {
log.Error(err, "Failed to patch job", "data", string(data))
return ctrl.Result{}, err
}
log.V(2).Info("Patched job", "response", resp)

patch := client.MergeFrom(&job)
newJob := job.DeepCopy()
if newJob.Annotations == nil {
newJob.Annotations = make(map[string]string)
}
newJob.Annotations[annoKeyClusterID] = resp.ClusterId
newJob.Annotations[annoKeyUID] = resp.Uid
newJob.Annotations[annoKeyDeployedAt] = metav1.Now().UTC().Format(time.RFC3339)
if err := c.k8sClient.Patch(ctx, newJob, patch); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: I could be totally wrong, but if we do patch the job here, that will trigger another invocation of Reconcile? Just wanted to make sure there is no infinite call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the job is already deployed, L116 simply returns and finishes the reconciliation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. LG!

log.Error(err, "Failed to update job")
return ctrl.Result{}, err
}

c.recorder.Event(&job, "Normal", "Deployed", fmt.Sprintf("Job(%s) is deployed to the Cluster(%s)", resp.Uid, resp.ClusterId))
log.Info("Deployed job")
return ctrl.Result{}, nil
}
Loading
Loading