Skip to content

Commit

Permalink
scheduler: support reservation reserved and fix preempting pods resource
Browse files Browse the repository at this point in the history
Co-authored-by: shenxin <rougang.hrg@alibaba-inc.com>
Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube and shenxin committed Jan 9, 2025
1 parent df3d309 commit 7275544
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 7 deletions.
6 changes: 6 additions & 0 deletions pkg/scheduler/frameworkext/reservation_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type ReservationInfo struct {
ResourceNames []corev1.ResourceName
Allocatable corev1.ResourceList
Allocated corev1.ResourceList
Reserved corev1.ResourceList // reserved inside the reservation
AllocatablePorts framework.HostPortInfo
AllocatedPorts framework.HostPortInfo
AssignedPods map[types.UID]*PodRequirement
Expand Down Expand Up @@ -80,6 +81,7 @@ func (p *PodRequirement) Clone() *PodRequirement {
func NewReservationInfo(r *schedulingv1alpha1.Reservation) *ReservationInfo {
var parseErrors []error
allocatable := reservationutil.ReservationRequests(r)
reserved := util.GetNodeReservationFromAnnotation(r.Annotations)
resourceNames := quotav1.ResourceNames(allocatable)
if r.Spec.AllocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyRestricted {
options, err := apiext.GetReservationRestrictedOptions(r.Annotations)
Expand Down Expand Up @@ -107,6 +109,7 @@ func NewReservationInfo(r *schedulingv1alpha1.Reservation) *ReservationInfo {
Pod: reservedPod,
ResourceNames: resourceNames,
Allocatable: allocatable,
Reserved: reserved,
AllocatablePorts: util.RequestedHostPorts(reservedPod),
AssignedPods: map[types.UID]*PodRequirement{},
OwnerMatchers: ownerMatchers,
Expand All @@ -118,6 +121,7 @@ func NewReservationInfoFromPod(pod *corev1.Pod) *ReservationInfo {
var parseErrors []error

allocatable := resource.PodRequests(pod, resource.PodResourcesOptions{})
reserved := util.GetNodeReservationFromAnnotation(pod.Annotations)
resourceNames := quotav1.ResourceNames(allocatable)
options, err := apiext.GetReservationRestrictedOptions(pod.Annotations)
if err == nil {
Expand Down Expand Up @@ -148,6 +152,7 @@ func NewReservationInfoFromPod(pod *corev1.Pod) *ReservationInfo {
Pod: pod,
ResourceNames: resourceNames,
Allocatable: allocatable,
Reserved: reserved,
AllocatablePorts: util.RequestedHostPorts(pod),
AssignedPods: map[types.UID]*PodRequirement{},
OwnerMatchers: ownerMatchers,
Expand Down Expand Up @@ -344,6 +349,7 @@ func (ri *ReservationInfo) Clone() *ReservationInfo {
ResourceNames: resourceNames,
Allocatable: ri.Allocatable.DeepCopy(),
Allocated: ri.Allocated.DeepCopy(),
Reserved: ri.Reserved.DeepCopy(),
AllocatablePorts: util.CloneHostPorts(ri.AllocatablePorts),
AllocatedPorts: util.CloneHostPorts(ri.AllocatedPorts),
AssignedPods: assignedPods,
Expand Down
13 changes: 10 additions & 3 deletions pkg/scheduler/plugins/reservation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,8 @@ func fitsNode(podRequest *framework.Resource, nodeInfo *framework.NodeInfo, node

var rRemained *framework.Resource
if rInfo != nil {
resources := quotav1.Subtract(rInfo.Allocatable, rInfo.Allocated)
// Reservation available = Allocatable - Allocated - InnerReserved
resources := quotav1.Subtract(quotav1.Subtract(rInfo.Allocatable, rInfo.Allocated), rInfo.Reserved)
rRemained = framework.NewResource(resources)
} else {
rRemained = dummyResource
Expand Down Expand Up @@ -603,17 +604,18 @@ func fitsReservation(podRequest corev1.ResourceList, rInfo *frameworkext.Reserva
if len(preemptibleInRR) > 0 {
allocated = quotav1.SubtractWithNonNegativeResult(allocated, preemptibleInRR)
}
allocatable := rInfo.Allocatable
allocated = quotav1.Mask(allocated, rInfo.ResourceNames)
reserved := quotav1.Mask(rInfo.Reserved, rInfo.ResourceNames)
requests := quotav1.Mask(podRequest, rInfo.ResourceNames)
allocatable := rInfo.Allocatable

var insufficientResourceReasons []string

// check "pods" resource in the reservation when reserved explicitly
if maxPods, found := allocatable[corev1.ResourcePods]; found {
allocatedPods := rInfo.GetAllocatedPods()
if preemptiblePodsInRR, found := preemptibleInRR[corev1.ResourcePods]; found {
allocatedPods += int(preemptiblePodsInRR.Value()) // assert no overflow
allocatedPods -= int(preemptiblePodsInRR.Value()) // assert no overflow
}
if int64(allocatedPods)+1 > maxPods.Value() {
if !isDetailed {
Expand All @@ -640,6 +642,11 @@ func fitsReservation(podRequest corev1.ResourceList, rInfo *frameworkext.Reserva
if !found {
used = *resource.NewQuantity(0, resource.DecimalSI)
}
reservedQ, found := reserved[resourceName]
if found {
// NOTE: capacity excludes the reserved resource
capacity.Sub(reservedQ)
}
remained := capacity.DeepCopy()
remained.Sub(used)

Expand Down
144 changes: 140 additions & 4 deletions pkg/scheduler/plugins/reservation/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,9 @@ func Test_filterWithReservations(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "test-r",
UID: "123456",
Annotations: map[string]string{
apiext.AnnotationNodeReservation: `{"resources": {"cpu": "1"}}`,
},
},
Spec: schedulingv1alpha1.ReservationSpec{
AllocatePolicy: schedulingv1alpha1.ReservationAllocatePolicyRestricted,
Expand All @@ -797,7 +800,7 @@ func Test_filterWithReservations(t *testing.T) {
{
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("6"),
corev1.ResourceCPU: resource.MustParse("7"),
corev1.ResourcePods: resource.MustParse("2"),
},
},
Expand All @@ -808,7 +811,7 @@ func Test_filterWithReservations(t *testing.T) {
},
Status: schedulingv1alpha1.ReservationStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("6"),
corev1.ResourceCPU: resource.MustParse("7"),
corev1.ResourcePods: resource.MustParse("2"),
},
},
Expand Down Expand Up @@ -1010,6 +1013,53 @@ func Test_filterWithReservations(t *testing.T) {
},
wantStatus: nil,
},
{
name: "filter restricted reservation with affinity",
stateData: &stateData{
schedulingStateData: schedulingStateData{
hasAffinity: true,
podRequests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("6"),
corev1.ResourceMemory: resource.MustParse("8Gi"),
},
nodeReservationStates: map[string]*nodeReservationState{
node.Name: {
podRequested: &framework.Resource{
MilliCPU: 30 * 1000,
Memory: 24 * 1024 * 1024 * 1024,
},
rAllocated: &framework.Resource{
MilliCPU: 0,
},
matchedOrIgnored: []*frameworkext.ReservationInfo{
frameworkext.NewReservationInfo(&schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "test-r",
},
Spec: schedulingv1alpha1.ReservationSpec{
AllocatePolicy: schedulingv1alpha1.ReservationAllocatePolicyRestricted,
Template: &corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("6"),
},
},
},
},
},
},
},
}),
},
},
},
},
},
wantStatus: nil,
},
{
name: "filter restricted reservation with nodeInfo and matched requests are zero",
stateData: &stateData{
Expand Down Expand Up @@ -1237,6 +1287,56 @@ func Test_filterWithReservations(t *testing.T) {
},
wantStatus: nil,
},
{
name: "failed to filter restricted reservation due to reserved",
stateData: &stateData{
schedulingStateData: schedulingStateData{
hasAffinity: true,
podRequests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("6"),
corev1.ResourceMemory: resource.MustParse("8Gi"),
},
nodeReservationStates: map[string]*nodeReservationState{
node.Name: {
podRequested: &framework.Resource{
MilliCPU: 30 * 1000,
Memory: 24 * 1024 * 1024 * 1024,
},
rAllocated: &framework.Resource{
MilliCPU: 0,
},
matchedOrIgnored: []*frameworkext.ReservationInfo{
frameworkext.NewReservationInfo(&schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "test-r",
Annotations: map[string]string{
apiext.AnnotationNodeReservation: `{"resources": {"cpu": "2"}}`,
},
},
Spec: schedulingv1alpha1.ReservationSpec{
AllocatePolicy: schedulingv1alpha1.ReservationAllocatePolicyRestricted,
Template: &corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("6"),
},
},
},
},
},
},
},
}),
},
},
},
},
},
wantStatus: framework.NewStatus(framework.Unschedulable, "Reservation(s) Insufficient cpu"),
},
{
name: "filter default reservations with preemption",
stateData: &stateData{
Expand Down Expand Up @@ -1853,8 +1953,8 @@ func Test_filterWithReservations(t *testing.T) {
preemptibleInRRs: map[string]map[types.UID]corev1.ResourceList{
node.Name: {
"123456": {
corev1.ResourceCPU: resource.MustParse("-1"),
corev1.ResourcePods: resource.MustParse("-1"),
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourcePods: resource.MustParse("1"),
},
},
},
Expand Down Expand Up @@ -1904,6 +2004,42 @@ func Test_filterWithReservations(t *testing.T) {
wantStatus: framework.NewStatus(framework.Unschedulable, "Reservation(s) Too many pods, "+
"requested: 1, used: 2, capacity: 2"),
},
{
name: "failed to filter restricted reservation with name and reserved since insufficient resource",
stateData: &stateData{
schedulingStateData: schedulingStateData{
hasAffinity: true,
reservationName: "test-r",
podRequests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("6"),
},
preemptibleInRRs: map[string]map[types.UID]corev1.ResourceList{
node.Name: {
"123456": {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourcePods: resource.MustParse("1"),
},
},
},
nodeReservationStates: map[string]*nodeReservationState{
node.Name: {
podRequested: &framework.Resource{
MilliCPU: 30 * 1000,
Memory: 24 * 1024 * 1024 * 1024,
},
rAllocated: &framework.Resource{
MilliCPU: 2000,
},
matchedOrIgnored: []*frameworkext.ReservationInfo{
testRInfo.Clone(),
},
},
},
},
},
wantStatus: framework.NewStatus(framework.Unschedulable, "Reservation(s) Insufficient cpu, "+
"requested: 6000, used: 1000, capacity: 6000"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down

0 comments on commit 7275544

Please sign in to comment.