Skip to content

Commit

Permalink
Update queue status process.
Browse files Browse the repository at this point in the history
Signed-off-by: jiangkaihua <jiangkaihua1@huawei.com>
  • Loading branch information
jiangkaihua committed Dec 6, 2022
1 parent 3dc607c commit 93afc6b
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 12 deletions.
2 changes: 2 additions & 0 deletions pkg/controllers/queue/queue_controller_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateF
queueStatus.State = queue.Status.State
}

queueStatus.Allocated = queue.Status.Allocated.DeepCopy()

// ignore update when status does not change
if reflect.DeepEqual(queueStatus, queue.Status) {
return nil
Expand Down
16 changes: 16 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,22 @@ func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG b
return job, nil
}

// UpdateQueueStatus update the status of queue.
func (sc *SchedulerCache) UpdateQueueStatus(queue *schedulingapi.QueueInfo) error {
var newQueue = &vcv1beta1.Queue{}
if err := schedulingscheme.Scheme.Convert(queue.Queue, newQueue, nil); err != nil {
klog.Errorf("error occurred in converting scheduling.Queue to v1beta1.Queue: %s", err.Error())
return err
}

_, err := sc.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("error occurred in updating Queue <%s>: %s", newQueue.Name, err.Error())
return err
}
return nil
}

func (sc *SchedulerCache) recordPodGroupEvent(podGroup *schedulingapi.PodGroup, eventType, reason, msg string) {
if podGroup == nil {
return
Expand Down
21 changes: 12 additions & 9 deletions pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
scheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding"
)

// Cache collects pods/nodes/queues information
Expand Down Expand Up @@ -56,17 +56,20 @@ type Cache interface {
// UpdateJobStatus puts job in backlog for a while.
UpdateJobStatus(job *api.JobInfo, updatePG bool) (*api.JobInfo, error)

// UpdateQueueStatus update queue status.
UpdateQueueStatus(queue *api.QueueInfo) error

// GetPodVolumes get pod volume on the host
GetPodVolumes(task *api.TaskInfo, node *v1.Node) (*scheduling.PodVolumes, error)
GetPodVolumes(task *api.TaskInfo, node *v1.Node) (*volumebinding.PodVolumes, error)

// AllocateVolumes allocates volume on the host to the task
AllocateVolumes(task *api.TaskInfo, hostname string, podVolumes *scheduling.PodVolumes) error
AllocateVolumes(task *api.TaskInfo, hostname string, podVolumes *volumebinding.PodVolumes) error

// BindVolumes binds volumes to the task
BindVolumes(task *api.TaskInfo, volumes *scheduling.PodVolumes) error
BindVolumes(task *api.TaskInfo, volumes *volumebinding.PodVolumes) error

// RevertVolumes clean cache generated by AllocateVolumes
RevertVolumes(task *api.TaskInfo, podVolumes *scheduling.PodVolumes)
RevertVolumes(task *api.TaskInfo, podVolumes *volumebinding.PodVolumes)

// Client returns the kubernetes clientSet, which can be used by plugins
Client() kubernetes.Interface
Expand All @@ -88,10 +91,10 @@ type Cache interface {

// VolumeBinder interface for allocate and bind volumes
type VolumeBinder interface {
GetPodVolumes(task *api.TaskInfo, node *v1.Node) (*scheduling.PodVolumes, error)
RevertVolumes(task *api.TaskInfo, podVolumes *scheduling.PodVolumes)
AllocateVolumes(task *api.TaskInfo, hostname string, podVolumes *scheduling.PodVolumes) error
BindVolumes(task *api.TaskInfo, podVolumes *scheduling.PodVolumes) error
GetPodVolumes(task *api.TaskInfo, node *v1.Node) (*volumebinding.PodVolumes, error)
RevertVolumes(task *api.TaskInfo, podVolumes *volumebinding.PodVolumes)
AllocateVolumes(task *api.TaskInfo, hostname string, podVolumes *volumebinding.PodVolumes) error
BindVolumes(task *api.TaskInfo, podVolumes *volumebinding.PodVolumes) error
}

// Binder interface for binding task and hostname
Expand Down
30 changes: 30 additions & 0 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package framework

import (
"fmt"
"reflect"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -28,6 +29,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/klog"

"volcano.sh/apis/pkg/apis/scheduling"
schedulingscheme "volcano.sh/apis/pkg/apis/scheduling/scheme"
vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
Expand Down Expand Up @@ -189,6 +191,34 @@ func closeSession(ssn *Session) {
ju := newJobUpdater(ssn)
ju.UpdateAll()

// calculate allocated resources on each queue
var allocatedResources = make(map[api.QueueID]*api.Resource, len(ssn.Queues))
for queueID := range ssn.Queues {
allocatedResources[queueID] = &api.Resource{}
}
for _, job := range ssn.Jobs {
for _, runningTask := range job.TaskStatusIndex[api.Running] {
allocatedResources[job.Queue].Add(runningTask.Resreq)
}
}

// update queue status
for queueID := range ssn.Queues {
// convert api.Resource to v1.ResourceList
var queueStatus = util.ConvertRes2ResList(allocatedResources[queueID]).DeepCopy()
if reflect.DeepEqual(ssn.Queues[queueID].Queue.Status.Allocated, queueStatus) {
klog.V(5).Infof("Queue <%s> allocated resource keeps equal, no need to update queue status <%v>.",
queueID, ssn.Queues[queueID].Queue.Status.Allocated)
continue
}

ssn.Queues[queueID].Queue.Status.Allocated = queueStatus

if err := ssn.cache.UpdateQueueStatus(ssn.Queues[queueID]); err != nil {
klog.Errorf("failed to update queue <%s> status: %s", ssn.Queues[queueID].Name, err.Error())
}
}

ssn.Jobs = nil
ssn.Nodes = nil
ssn.RevocableNodes = nil
Expand Down
20 changes: 17 additions & 3 deletions pkg/scheduler/util/scheduler_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ package util
import (
"context"
"fmt"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"
"math"
"math/rand"
"sort"
"sync"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"

"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/scheduler/api"
)
Expand Down Expand Up @@ -199,3 +202,14 @@ func GetMinInt(vals ...int) int {
}
return min
}

// ConvertRes2ResList convert resource type from api.Resource in scheduler to v1.ResourceList in yaml
func ConvertRes2ResList(res *api.Resource) v1.ResourceList {
var rl = v1.ResourceList{}
rl[v1.ResourceCPU] = *resource.NewMilliQuantity(int64(res.MilliCPU), resource.DecimalSI)
rl[v1.ResourceMemory] = *resource.NewQuantity(int64(res.Memory), resource.BinarySI)
for resourceName, f := range res.ScalarResources {
rl[resourceName] = *resource.NewMilliQuantity(int64(f), resource.DecimalSI)
}
return rl
}

0 comments on commit 93afc6b

Please sign in to comment.