Skip to content

Commit

Permalink
feat: implement a scheduling scoring algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
kkaneda committed Feb 12, 2025
1 parent fd90a5e commit e16fdd3
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 28 deletions.
102 changes: 74 additions & 28 deletions server/internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ func (s *S) Schedule(userInfo *auth.UserInfo, prevScheduledClusterID string, gpu
namespacesByCluster[env.ClusterID] = env.Namespace
}
s.logger.V(1).Info("Scheduling a workload", "gpuCount", gpuCount, "assignedClustersEnvs", userInfo.AssignedKubernetesEnvs)

var (
bestResult *SchedulingResult
bestScore float64
)
for _, c := range clusters {
if time.Since(c.UpdatedAt) > staleThreshold {
s.logger.V(1).Info("Ignoring a stale cluster", "clusterID", c.ClusterID)
Expand All @@ -82,30 +87,67 @@ func (s *S) Schedule(userInfo *auth.UserInfo, prevScheduledClusterID string, gpu
continue
}

// Just pick up the first cluster that can provision GPU resources if gpuCount is > 0.
// Otherwise just pick up the first cluster.
if gpuCount > 0 {
var status v1.ClusterStatus
if err := proto.Unmarshal(c.Status, &status); err != nil {
return SchedulingResult{}, err
}

if ok, err := canProvisionGPUs(gpuCount, &status); err != nil {
return SchedulingResult{}, err
} else if !ok {
s.logger.V(1).Info("Ignoring a cluster that cannot provision GPUs", "clusterID", c.ClusterID)
continue
score, err := scoreCluster(c, gpuCount)
if err != nil {
return SchedulingResult{}, err
}
if !score.isFeasible {
s.logger.V(1).Info("Ignoring a cluster as the workload cannot be scheduled", "clusterID", c.ClusterID)
continue
}
if bestResult == nil || score.score > bestScore {
bestResult = &SchedulingResult{
ClusterID: c.ClusterID,
ClusterName: c.Name,
Namespace: ns,
}
bestScore = score.score
}
s.logger.V(1).Info("Scheduled a workload", "clusterID", c.ClusterID, "namespace", ns)
return SchedulingResult{
ClusterID: c.ClusterID,
ClusterName: c.Name,
Namespace: ns,
}

if bestResult == nil {
return SchedulingResult{}, fmt.Errorf("no schedulable cluster")
}

s.logger.V(1).Info("Scheduled a workload", "clusterID", bestResult.ClusterID, "namespace", bestResult.Namespace)
return *bestResult, nil
}

type schedulingScore struct {
isFeasible bool
score float64
}

func scoreCluster(c *store.Cluster, requestedGPUs int) (schedulingScore, error) {
if requestedGPUs == 0 {
// Simply assume that the workload can run there.
return schedulingScore{
isFeasible: true,
score: 0,
}, nil
}

return SchedulingResult{}, fmt.Errorf("no schedulable cluster")
var status v1.ClusterStatus
if err := proto.Unmarshal(c.Status, &status); err != nil {
return schedulingScore{}, err
}

ok, err := canProvisionGPUs(requestedGPUs, &status)
if err != nil {
return schedulingScore{}, err
}

if !ok {
return schedulingScore{
isFeasible: false,
}, err
}
return schedulingScore{
isFeasible: true,
// TODO(kenji): Improve the scoring algorithm. Currently it is a simple best-fit where a capacility with the largest
// number of available GPUs is selected.
score: float64(availableGPUs(&status)),
}, nil
}

// canProvisionGPUs returns true if the cluster can provision GPUs.
Expand All @@ -116,15 +158,7 @@ func canProvisionGPUs(requestedGPUs int, status *v1.ClusterStatus) (bool, error)
// TODO(kenji): Take into resource fragmentation.
// TODO(kenji): Subtract the number of GPUs allocated for this scheduling (and then revert the change
// once dispatcher reports the updated status including the scheduled workload.
var allocatable int
for _, n := range status.GpuNodes {
allocatable += int(n.AllocatableCount)
}
var allocated int
for _, p := range status.GpuPods {
allocated += int(p.AllocatedCount)
}
return requestedGPUs <= allocatable-allocated, nil
return requestedGPUs <= availableGPUs(status), nil
}

for _, pr := range status.ProvisionableResources {
Expand All @@ -148,6 +182,18 @@ func canProvisionGPUs(requestedGPUs int, status *v1.ClusterStatus) (bool, error)
return false, nil
}

func availableGPUs(status *v1.ClusterStatus) int {
var allocatable int
for _, n := range status.GpuNodes {
allocatable += int(n.AllocatableCount)
}
var allocated int
for _, p := range status.GpuPods {
allocated += int(p.AllocatedCount)
}
return allocatable - allocated
}

func isAWSInstanceTypeForNvidiaGPU(instType string) (bool, error) {
// Get the family from the instance type.
l := strings.Split(instType, ".")
Expand Down
47 changes: 47 additions & 0 deletions server/internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,53 @@ func TestSchedule(t *testing.T) {
prevClusterID: "cluster0",
wantErr: true,
},
{
name: "two gpu clusters",
clusters: []*store.Cluster{
{
ClusterID: "cluster0",
TenantID: tenantID,
Status: marshalStatus(t, &v1.ClusterStatus{
GpuNodes: []*v1.GpuNode{
{
ResourceName: "nvidia.com/gpu",
AllocatableCount: 16,
},
},
}),
},
{
ClusterID: "cluster1",
TenantID: tenantID,
Status: marshalStatus(t, &v1.ClusterStatus{
GpuNodes: []*v1.GpuNode{
{
ResourceName: "nvidia.com/gpu",
AllocatableCount: 8,
},
},
}),
},
},
userInfo: &auth.UserInfo{
TenantID: tenantID,
AssignedKubernetesEnvs: []auth.AssignedKubernetesEnv{
{
ClusterID: "cluster0",
Namespace: "namespace0",
},
{
ClusterID: "cluster1",
Namespace: "namespace1",
},
},
},
gpuCount: 1,
want: SchedulingResult{
ClusterID: "cluster0",
Namespace: "namespace0",
},
},
}

for _, tc := range tcs {
Expand Down

0 comments on commit e16fdd3

Please sign in to comment.