Skip to content

Commit

Permalink
Merge pull request #2038 from Thor-wl/0224-spark3.3
Browse files Browse the repository at this point in the history
Fix the resource calculation error for running jobs whose podgroup contains minResource
  • Loading branch information
volcano-sh-bot authored Mar 5, 2022
2 parents 523bc2f + 7dfec97 commit 9698fbb
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 3 deletions.
4 changes: 2 additions & 2 deletions pkg/scheduler/plugins/numaaware/numaaware.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,14 @@ func getNodeNumaNumForTask(nodeInfo []*api.NodeInfo, resAssignMap map[string]api
assignCpus := resAssignMap[node.Name][string(v1.ResourceCPU)]
nodeNumaCnts[index] = api.ScoredNode{
NodeName: node.Name,
Score: int64(getNumaNodeCntForCpuID(assignCpus, node.NumaSchedulerInfo.CPUDetail)),
Score: int64(getNumaNodeCntForCPUID(assignCpus, node.NumaSchedulerInfo.CPUDetail)),
}
})

return nodeNumaCnts
}

func getNumaNodeCntForCpuID(cpus cpuset.CPUSet, cpuDetails topology.CPUDetails) int {
func getNumaNodeCntForCPUID(cpus cpuset.CPUSet, cpuDetails topology.CPUDetails) int {
mask, _ := bitmask.NewBitMask()
s := cpus.ToSlice()

Expand Down
13 changes: 12 additions & 1 deletion pkg/scheduler/plugins/overcommit/overcommit.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,21 @@ func (op *overcommitPlugin) OnSessionOpen(ssn *framework.Session) {
}
op.idleResource = total.Clone().Multi(op.overCommitFactor).Sub(used)

// calculate inqueue job resources
for _, job := range ssn.Jobs {
// calculate inqueue job resources
if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue && job.PodGroup.Spec.MinResources != nil {
op.inqueueResource.Add(api.NewResource(*job.PodGroup.Spec.MinResources))
continue
}
// calculate inqueue resource for running jobs
// the judgement 'job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember' will work on cases such as the following condition:
// Considering a Spark job is completed(driver pod is completed) while the podgroup keeps running, the allocated resource will be reserved again if without the judgement.
if job.PodGroup.Status.Phase == scheduling.PodGroupRunning &&
job.PodGroup.Spec.MinResources != nil &&
job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember {
allocated := util.GetAllocatedResource(job)
inqueued := util.GetInqueueResource(job, allocated)
op.inqueueResource.Add(inqueued)
}
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/scheduler/plugins/proportion/proportion.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,17 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue {
attr.inqueue.Add(job.GetMinResources())
}

// calculate inqueue resource for running jobs
// the judgement 'job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember' will work on cases such as the following condition:
// Considering a Spark job is completed(driver pod is completed) while the podgroup keeps running, the allocated resource will be reserved again if without the judgement.
if job.PodGroup.Status.Phase == scheduling.PodGroupRunning &&
job.PodGroup.Spec.MinResources != nil &&
job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember {
allocated := util.GetAllocatedResource(job)
inqueued := util.GetInqueueResource(job, allocated)
attr.inqueue.Add(inqueued)
}
}

// Record metrics
Expand Down
45 changes: 45 additions & 0 deletions pkg/scheduler/plugins/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,48 @@ func NormalizeScore(maxPriority int64, reverse bool, scores []api.ScoredNode) {
scores[idx].Score = score
}
}

// GetAllocatedResource returns allocated resource for given job
func GetAllocatedResource(job *api.JobInfo) *api.Resource {
allocated := &api.Resource{}
for status, tasks := range job.TaskStatusIndex {
if api.AllocatedStatus(status) {
for _, t := range tasks {
allocated.Add(t.Resreq)
}
}
}
return allocated
}

// GetInqueueResource returns reserved resource for running job whose part of pods have not been allocated resource.
func GetInqueueResource(job *api.JobInfo, allocated *api.Resource) *api.Resource {
inqueue := &api.Resource{}
for rName, rQuantity := range *job.PodGroup.Spec.MinResources {
switch rName {
case v1.ResourceCPU:
reservedCPU := float64(rQuantity.Value()) - allocated.MilliCPU
if reservedCPU > 0 {
inqueue.MilliCPU = reservedCPU
}
case v1.ResourceMemory:
reservedMemory := float64(rQuantity.Value()) - allocated.Memory
if reservedMemory > 0 {
inqueue.Memory = reservedMemory
}
default:
if inqueue.ScalarResources == nil {
inqueue.ScalarResources = make(map[v1.ResourceName]float64)
}
if allocatedMount, ok := allocated.ScalarResources[rName]; !ok {
inqueue.ScalarResources[rName] = float64(rQuantity.Value())
} else {
reservedScalarRes := float64(rQuantity.Value()) - allocatedMount
if reservedScalarRes > 0 {
inqueue.ScalarResources[rName] = reservedScalarRes
}
}
}
}
return inqueue
}

0 comments on commit 9698fbb

Please sign in to comment.