diff --git a/pkg/controllers/queue/queue_controller_action.go b/pkg/controllers/queue/queue_controller_action.go index f1b4bb10615..c9ad8ac5d2a 100644 --- a/pkg/controllers/queue/queue_controller_action.go +++ b/pkg/controllers/queue/queue_controller_action.go @@ -66,6 +66,8 @@ func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateF queueStatus.State = queue.Status.State } + queueStatus.Allocated = queue.Status.Allocated.DeepCopy() + // ignore update when status does not change if reflect.DeepEqual(queueStatus, queue.Status) { return nil diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 594f7d05754..2f8b395c2c2 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -1218,6 +1218,22 @@ func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG b return job, nil } +// UpdateQueueStatus update the status of queue. +func (sc *SchedulerCache) UpdateQueueStatus(queue *schedulingapi.QueueInfo) error { + var newQueue = &vcv1beta1.Queue{} + if err := schedulingscheme.Scheme.Convert(queue.Queue, newQueue, nil); err != nil { + klog.Errorf("error occurred in converting scheduling.Queue to v1beta1.Queue: %s", err.Error()) + return err + } + + _, err := sc.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("error occurred in updating Queue <%s>: %s", newQueue.Name, err.Error()) + return err + } + return nil +} + func (sc *SchedulerCache) recordPodGroupEvent(podGroup *schedulingapi.PodGroup, eventType, reason, msg string) { if podGroup == nil { return diff --git a/pkg/scheduler/cache/interface.go b/pkg/scheduler/cache/interface.go index 99bcd0a23c5..7b46c48471e 100644 --- a/pkg/scheduler/cache/interface.go +++ b/pkg/scheduler/cache/interface.go @@ -22,9 +22,9 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" - scheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding" "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding" ) // Cache collects pods/nodes/queues information @@ -56,17 +56,20 @@ type Cache interface { // UpdateJobStatus puts job in backlog for a while. UpdateJobStatus(job *api.JobInfo, updatePG bool) (*api.JobInfo, error) + // UpdateQueueStatus update queue status. + UpdateQueueStatus(queue *api.QueueInfo) error + // GetPodVolumes get pod volume on the host - GetPodVolumes(task *api.TaskInfo, node *v1.Node) (*scheduling.PodVolumes, error) + GetPodVolumes(task *api.TaskInfo, node *v1.Node) (*volumebinding.PodVolumes, error) // AllocateVolumes allocates volume on the host to the task - AllocateVolumes(task *api.TaskInfo, hostname string, podVolumes *scheduling.PodVolumes) error + AllocateVolumes(task *api.TaskInfo, hostname string, podVolumes *volumebinding.PodVolumes) error // BindVolumes binds volumes to the task - BindVolumes(task *api.TaskInfo, volumes *scheduling.PodVolumes) error + BindVolumes(task *api.TaskInfo, volumes *volumebinding.PodVolumes) error // RevertVolumes clean cache generated by AllocateVolumes - RevertVolumes(task *api.TaskInfo, podVolumes *scheduling.PodVolumes) + RevertVolumes(task *api.TaskInfo, podVolumes *volumebinding.PodVolumes) // Client returns the kubernetes clientSet, which can be used by plugins Client() kubernetes.Interface @@ -88,10 +91,10 @@ type Cache interface { // VolumeBinder interface for allocate and bind volumes type VolumeBinder interface { - GetPodVolumes(task *api.TaskInfo, node *v1.Node) (*scheduling.PodVolumes, error) - RevertVolumes(task *api.TaskInfo, podVolumes *scheduling.PodVolumes) - AllocateVolumes(task *api.TaskInfo, hostname string, podVolumes *scheduling.PodVolumes) error - BindVolumes(task *api.TaskInfo, podVolumes *scheduling.PodVolumes) error + GetPodVolumes(task *api.TaskInfo, node *v1.Node) (*volumebinding.PodVolumes, error) + RevertVolumes(task *api.TaskInfo, podVolumes *volumebinding.PodVolumes) + AllocateVolumes(task *api.TaskInfo, hostname string, podVolumes *volumebinding.PodVolumes) error + BindVolumes(task *api.TaskInfo, podVolumes *volumebinding.PodVolumes) error } // Binder interface for binding task and hostname diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 965fd64f5f1..6b089a47309 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -18,6 +18,7 @@ package framework import ( "fmt" + "reflect" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -28,6 +29,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "k8s.io/klog" + "volcano.sh/apis/pkg/apis/scheduling" schedulingscheme "volcano.sh/apis/pkg/apis/scheduling/scheme" vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" @@ -189,6 +191,34 @@ func closeSession(ssn *Session) { ju := newJobUpdater(ssn) ju.UpdateAll() + // calculate allocated resources on each queue + var allocatedResources = make(map[api.QueueID]*api.Resource, len(ssn.Queues)) + for queueID := range ssn.Queues { + allocatedResources[queueID] = &api.Resource{} + } + for _, job := range ssn.Jobs { + for _, runningTask := range job.TaskStatusIndex[api.Running] { + allocatedResources[job.Queue].Add(runningTask.Resreq) + } + } + + // update queue status + for queueID := range ssn.Queues { + // convert api.Resource to v1.ResourceList + var queueStatus = util.ConvertRes2ResList(allocatedResources[queueID]).DeepCopy() + if reflect.DeepEqual(ssn.Queues[queueID].Queue.Status.Allocated, queueStatus) { + klog.V(5).Infof("Queue <%s> allocated resource keeps equal, no need to update queue status <%v>.", + queueID, ssn.Queues[queueID].Queue.Status.Allocated) + continue + } + + ssn.Queues[queueID].Queue.Status.Allocated = queueStatus + + if err := ssn.cache.UpdateQueueStatus(ssn.Queues[queueID]); err != nil { + klog.Errorf("failed to update queue <%s> status: %s", ssn.Queues[queueID].Name, err.Error()) + } + } + ssn.Jobs = nil ssn.Nodes = nil ssn.RevocableNodes = nil diff --git a/pkg/scheduler/util/scheduler_helper.go b/pkg/scheduler/util/scheduler_helper.go index b63ddcac689..7d72999aa84 100644 --- a/pkg/scheduler/util/scheduler_helper.go +++ b/pkg/scheduler/util/scheduler_helper.go @@ -19,14 +19,17 @@ package util import ( "context" "fmt" - "k8s.io/client-go/util/workqueue" - "k8s.io/klog" - k8sframework "k8s.io/kubernetes/pkg/scheduler/framework" "math" "math/rand" "sort" "sync" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" + k8sframework "k8s.io/kubernetes/pkg/scheduler/framework" + "volcano.sh/volcano/cmd/scheduler/app/options" "volcano.sh/volcano/pkg/scheduler/api" ) @@ -199,3 +202,14 @@ func GetMinInt(vals ...int) int { } return min } + +// ConvertRes2ResList convert resource type from api.Resource in scheduler to v1.ResourceList in yaml +func ConvertRes2ResList(res *api.Resource) v1.ResourceList { + var rl = v1.ResourceList{} + rl[v1.ResourceCPU] = *resource.NewMilliQuantity(int64(res.MilliCPU), resource.DecimalSI) + rl[v1.ResourceMemory] = *resource.NewQuantity(int64(res.Memory), resource.BinarySI) + for resourceName, f := range res.ScalarResources { + rl[resourceName] = *resource.NewMilliQuantity(int64(f), resource.DecimalSI) + } + return rl +}