Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick] cherry pick recently fix to v1.5 #2057

Merged
merged 2 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/controllers/podgroup/pg_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func (pg *pgcontroller) createNormalPodPGIfNotExist(pod *v1.Pod) error {
PriorityClassName: pod.Spec.PriorityClassName,
MinResources: calcPGMinResources(pod),
},
Status: scheduling.PodGroupStatus{
Phase: scheduling.PodGroupPending,
},
}
if queueName, ok := pod.Annotations[scheduling.QueueNameAnnotationKey]; ok {
obj.Spec.Queue = queueName
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/plugins/numaaware/numaaware.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,16 @@ func getNodeNumaNumForTask(nodeInfo []*api.NodeInfo, resAssignMap map[string]api
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodeInfo), func(index int) {
node := nodeInfo[index]
assignCpus := resAssignMap[node.Name][string(v1.ResourceCPU)]

mx.Lock()
defer mx.Unlock()
nodeNumaNumMap[node.Name] = int64(getNumaNodeCntForcpuID(assignCpus, node.NumaSchedulerInfo.CPUDetail))
nodeNumaNumMap[node.Name] = int64(getNumaNodeCntForCPUID(assignCpus, node.NumaSchedulerInfo.CPUDetail))
})

return nodeNumaNumMap
}

func getNumaNodeCntForcpuID(cpus cpuset.CPUSet, cpuDetails topology.CPUDetails) int {
func getNumaNodeCntForCPUID(cpus cpuset.CPUSet, cpuDetails topology.CPUDetails) int {
mask, _ := bitmask.NewBitMask()
s := cpus.ToSlice()

Expand Down
13 changes: 12 additions & 1 deletion pkg/scheduler/plugins/overcommit/overcommit.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,21 @@ func (op *overcommitPlugin) OnSessionOpen(ssn *framework.Session) {
}
op.idleResource = total.Clone().Multi(op.overCommitFactor).Sub(used)

// calculate inqueue job resources
for _, job := range ssn.Jobs {
// calculate inqueue job resources
if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue && job.PodGroup.Spec.MinResources != nil {
op.inqueueResource.Add(api.NewResource(*job.PodGroup.Spec.MinResources))
continue
}
// calculate inqueue resource for running jobs
// the judgement 'job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember' will work on cases such as the following condition:
// Considering a Spark job is completed(driver pod is completed) while the podgroup keeps running, the allocated resource will be reserved again if without the judgement.
if job.PodGroup.Status.Phase == scheduling.PodGroupRunning &&
job.PodGroup.Spec.MinResources != nil &&
job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember {
allocated := util.GetAllocatedResource(job)
inqueued := util.GetInqueueResource(job, allocated)
op.inqueueResource.Add(inqueued)
}
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/scheduler/plugins/proportion/proportion.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,17 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue {
attr.inqueue.Add(job.GetMinResources())
}

// calculate inqueue resource for running jobs
// the judgement 'job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember' will work on cases such as the following condition:
// Considering a Spark job is completed(driver pod is completed) while the podgroup keeps running, the allocated resource will be reserved again if without the judgement.
if job.PodGroup.Status.Phase == scheduling.PodGroupRunning &&
job.PodGroup.Spec.MinResources != nil &&
job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember {
allocated := util.GetAllocatedResource(job)
inqueued := util.GetInqueueResource(job, allocated)
attr.inqueue.Add(inqueued)
}
}

// Record metrics
Expand Down
45 changes: 45 additions & 0 deletions pkg/scheduler/plugins/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,48 @@ func NormalizeScore(maxPriority int64, reverse bool, scores map[string]int64) {
scores[key] = score
}
}

// GetAllocatedResource returns allocated resource for given job
func GetAllocatedResource(job *api.JobInfo) *api.Resource {
allocated := &api.Resource{}
for status, tasks := range job.TaskStatusIndex {
if api.AllocatedStatus(status) {
for _, t := range tasks {
allocated.Add(t.Resreq)
}
}
}
return allocated
}

// GetInqueueResource returns reserved resource for running job whose part of pods have not been allocated resource.
func GetInqueueResource(job *api.JobInfo, allocated *api.Resource) *api.Resource {
inqueue := &api.Resource{}
for rName, rQuantity := range *job.PodGroup.Spec.MinResources {
switch rName {
case v1.ResourceCPU:
reservedCPU := float64(rQuantity.Value()) - allocated.MilliCPU
if reservedCPU > 0 {
inqueue.MilliCPU = reservedCPU
}
case v1.ResourceMemory:
reservedMemory := float64(rQuantity.Value()) - allocated.Memory
if reservedMemory > 0 {
inqueue.Memory = reservedMemory
}
default:
if inqueue.ScalarResources == nil {
inqueue.ScalarResources = make(map[v1.ResourceName]float64)
}
if allocatedMount, ok := allocated.ScalarResources[rName]; !ok {
inqueue.ScalarResources[rName] = float64(rQuantity.Value())
} else {
reservedScalarRes := float64(rQuantity.Value()) - allocatedMount
if reservedScalarRes > 0 {
inqueue.ScalarResources[rName] = reservedScalarRes
}
}
}
}
return inqueue
}