diff --git a/server/internal/scheduler/scheduler.go b/server/internal/scheduler/scheduler.go index c2066a9..1d47365 100644 --- a/server/internal/scheduler/scheduler.go +++ b/server/internal/scheduler/scheduler.go @@ -66,6 +66,11 @@ func (s *S) Schedule(userInfo *auth.UserInfo, prevScheduledClusterID string, gpu namespacesByCluster[env.ClusterID] = env.Namespace } s.logger.V(1).Info("Scheduling a workload", "gpuCount", gpuCount, "assignedClustersEnvs", userInfo.AssignedKubernetesEnvs) + + var ( + bestResult *SchedulingResult + bestScore float64 + ) for _, c := range clusters { if time.Since(c.UpdatedAt) > staleThreshold { s.logger.V(1).Info("Ignoring a stale cluster", "clusterID", c.ClusterID) @@ -82,30 +87,67 @@ func (s *S) Schedule(userInfo *auth.UserInfo, prevScheduledClusterID string, gpu continue } - // Just pick up the first cluster that can provision GPU resources if gpuCount is > 0. - // Otherwise just pick up the first cluster. - if gpuCount > 0 { - var status v1.ClusterStatus - if err := proto.Unmarshal(c.Status, &status); err != nil { - return SchedulingResult{}, err - } - - if ok, err := canProvisionGPUs(gpuCount, &status); err != nil { - return SchedulingResult{}, err - } else if !ok { - s.logger.V(1).Info("Ignoring a cluster that cannot provision GPUs", "clusterID", c.ClusterID) - continue + score, err := scoreCluster(c, gpuCount) + if err != nil { + return SchedulingResult{}, err + } + if !score.isFeasible { + s.logger.V(1).Info("Ignoring a cluster as the workload cannot be scheduled", "clusterID", c.ClusterID) + continue + } + if bestResult == nil || score.score > bestScore { + bestResult = &SchedulingResult{ + ClusterID: c.ClusterID, + ClusterName: c.Name, + Namespace: ns, } + bestScore = score.score } - s.logger.V(1).Info("Scheduled a workload", "clusterID", c.ClusterID, "namespace", ns) - return SchedulingResult{ - ClusterID: c.ClusterID, - ClusterName: c.Name, - Namespace: ns, + } + + if bestResult == nil { + return SchedulingResult{}, fmt.Errorf("no schedulable cluster") + } + + s.logger.V(1).Info("Scheduled a workload", "clusterID", bestResult.ClusterID, "namespace", bestResult.Namespace) + return *bestResult, nil +} + +type schedulingScore struct { + isFeasible bool + score float64 +} + +func scoreCluster(c *store.Cluster, requestedGPUs int) (schedulingScore, error) { + if requestedGPUs == 0 { + // Simply assume that the workload can run there. + return schedulingScore{ + isFeasible: true, + score: 0, }, nil } - return SchedulingResult{}, fmt.Errorf("no schedulable cluster") + var status v1.ClusterStatus + if err := proto.Unmarshal(c.Status, &status); err != nil { + return schedulingScore{}, err + } + + ok, err := canProvisionGPUs(requestedGPUs, &status) + if err != nil { + return schedulingScore{}, err + } + + if !ok { + return schedulingScore{ + isFeasible: false, + }, err + } + return schedulingScore{ + isFeasible: true, + // TODO(kenji): Improve the scoring algorithm. Currently it is a simple best-fit where a capacility with the largest + // number of available GPUs is selected. + score: float64(availableGPUs(&status)), + }, nil } // canProvisionGPUs returns true if the cluster can provision GPUs. @@ -116,15 +158,7 @@ func canProvisionGPUs(requestedGPUs int, status *v1.ClusterStatus) (bool, error) // TODO(kenji): Take into resource fragmentation. // TODO(kenji): Subtract the number of GPUs allocated for this scheduling (and then revert the change // once dispatcher reports the updated status including the scheduled workload. - var allocatable int - for _, n := range status.GpuNodes { - allocatable += int(n.AllocatableCount) - } - var allocated int - for _, p := range status.GpuPods { - allocated += int(p.AllocatedCount) - } - return requestedGPUs <= allocatable-allocated, nil + return requestedGPUs <= availableGPUs(status), nil } for _, pr := range status.ProvisionableResources { @@ -148,6 +182,18 @@ func canProvisionGPUs(requestedGPUs int, status *v1.ClusterStatus) (bool, error) return false, nil } +func availableGPUs(status *v1.ClusterStatus) int { + var allocatable int + for _, n := range status.GpuNodes { + allocatable += int(n.AllocatableCount) + } + var allocated int + for _, p := range status.GpuPods { + allocated += int(p.AllocatedCount) + } + return allocatable - allocated +} + func isAWSInstanceTypeForNvidiaGPU(instType string) (bool, error) { // Get the family from the instance type. l := strings.Split(instType, ".") diff --git a/server/internal/scheduler/scheduler_test.go b/server/internal/scheduler/scheduler_test.go index 589eb54..f5a9693 100644 --- a/server/internal/scheduler/scheduler_test.go +++ b/server/internal/scheduler/scheduler_test.go @@ -145,6 +145,53 @@ func TestSchedule(t *testing.T) { prevClusterID: "cluster0", wantErr: true, }, + { + name: "two gpu clusters", + clusters: []*store.Cluster{ + { + ClusterID: "cluster0", + TenantID: tenantID, + Status: marshalStatus(t, &v1.ClusterStatus{ + GpuNodes: []*v1.GpuNode{ + { + ResourceName: "nvidia.com/gpu", + AllocatableCount: 16, + }, + }, + }), + }, + { + ClusterID: "cluster1", + TenantID: tenantID, + Status: marshalStatus(t, &v1.ClusterStatus{ + GpuNodes: []*v1.GpuNode{ + { + ResourceName: "nvidia.com/gpu", + AllocatableCount: 8, + }, + }, + }), + }, + }, + userInfo: &auth.UserInfo{ + TenantID: tenantID, + AssignedKubernetesEnvs: []auth.AssignedKubernetesEnv{ + { + ClusterID: "cluster0", + Namespace: "namespace0", + }, + { + ClusterID: "cluster1", + Namespace: "namespace1", + }, + }, + }, + gpuCount: 1, + want: SchedulingResult{ + ClusterID: "cluster0", + Namespace: "namespace0", + }, + }, } for _, tc := range tcs {