Skip to content

Commit

Permalink
scheduler: improve the failed event of Reservation's scheduling (#1947)
Browse files Browse the repository at this point in the history
Signed-off-by: wangjianyu.wjy <wangjianyu.wjy@alibaba-inc.com>
Co-authored-by: wangjianyu.wjy <wangjianyu.wjy@alibaba-inc.com>
  • Loading branch information
ZiMengSheng and wangjianyu.wjy authored Mar 13, 2024
1 parent 42bca6f commit a85121a
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 21 deletions.
50 changes: 35 additions & 15 deletions pkg/scheduler/plugins/reservation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,8 @@ func (pl *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState,
if len(state.preemptible[node.Name]) > 0 || len(state.preemptibleInRRs[node.Name]) > 0 {
preemptible := state.preemptible[node.Name]
preemptibleResource := framework.NewResource(preemptible)
nodeFits := fitsNode(state.podRequestsResources, nodeInfo, &nodeRState, nil, preemptibleResource)
if !nodeFits {
insufficientResources := fitsNode(state.podRequestsResources, nodeInfo, &nodeRState, nil, preemptibleResource)
if len(insufficientResources) != 0 {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPreemptionFailed)
}
}
Expand All @@ -378,10 +378,11 @@ func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framew
node := nodeInfo.Node()
state := getStateData(cycleState)
nodeRState := state.nodeReservationStates[node.Name]
podRequests, _ := resourceapi.PodRequestsAndLimits(pod)
podRequestsResourceNames := quotav1.ResourceNames(podRequests)
podRequestsResourceNames := quotav1.ResourceNames(state.podRequests)

var hasSatisfiedReservation bool
allInsufficientResourcesByNode := sets.NewString()
allInsufficientResourcesByReservation := sets.NewString()
for _, rInfo := range matchedReservations {
resourceNames := quotav1.Intersection(rInfo.ResourceNames, podRequestsResourceNames)
if len(resourceNames) == 0 {
Expand All @@ -391,14 +392,16 @@ func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framew
preemptibleInRR := state.preemptibleInRRs[node.Name][rInfo.UID()]
preemptible := framework.NewResource(preemptibleInRR)
preemptible.Add(state.preemptible[node.Name])
nodeFits := fitsNode(state.podRequestsResources, nodeInfo, &nodeRState, rInfo, preemptible)
insufficientResourcesByNode := fitsNode(state.podRequestsResources, nodeInfo, &nodeRState, rInfo, preemptible)
nodeFits := len(insufficientResourcesByNode) == 0
allocatePolicy := rInfo.GetAllocatePolicy()
if allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyDefault ||
allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyAligned {
if nodeFits {
hasSatisfiedReservation = true
break
}
allInsufficientResourcesByNode.Insert(insufficientResourcesByNode...)
} else if allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyRestricted {
allocated := rInfo.Allocated
if len(preemptibleInRR) > 0 {
Expand All @@ -407,34 +410,51 @@ func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framew
allocated = quotav1.Mask(allocated, rInfo.ResourceNames)
rRemained := quotav1.SubtractWithNonNegativeResult(rInfo.Allocatable, allocated)
requests := quotav1.Mask(state.podRequests, rInfo.ResourceNames)
fits, _ := quotav1.LessThanOrEqual(requests, rRemained)
fits, insufficientResourcesByReservation := quotav1.LessThanOrEqual(requests, rRemained)
if fits && nodeFits {
hasSatisfiedReservation = true
break
}
allInsufficientResourcesByNode.Insert(insufficientResourcesByNode...)
for _, insufficientResourceByReservation := range insufficientResourcesByReservation {
allInsufficientResourcesByReservation.Insert(string(insufficientResourceByReservation))
}
}
}
// The Pod requirement must be allocated from Reservation, but currently no Reservation meets the requirement
if !hasSatisfiedReservation && requiredFromReservation {
return framework.NewStatus(framework.Unschedulable, ErrReasonNoReservationsMeetRequirements)
// We will keep all failure reasons.
failureReasons := make([]string, 0, len(allInsufficientResourcesByNode)+len(allInsufficientResourcesByReservation)+1)
for insufficientResourceByNode := range allInsufficientResourcesByNode {
failureReasons = append(failureReasons, fmt.Sprintf("Insufficient %s by node", insufficientResourceByNode))
}
for insufficientResourceByReservation := range allInsufficientResourcesByReservation {
failureReasons = append(failureReasons, fmt.Sprintf("Insufficient %s by reservation", insufficientResourceByReservation))
}
if len(failureReasons) == 0 {
failureReasons = append(failureReasons, ErrReasonNoReservationsMeetRequirements)
}
return framework.NewStatus(framework.Unschedulable, failureReasons...)
}
return nil
}

var dummyResource = framework.NewResource(nil)

// fitsNode checks if node have enough resources to host the pod.
func fitsNode(podRequest *framework.Resource, nodeInfo *framework.NodeInfo, nodeRState *nodeReservationState, rInfo *frameworkext.ReservationInfo, preemptible *framework.Resource) bool {
func fitsNode(podRequest *framework.Resource, nodeInfo *framework.NodeInfo, nodeRState *nodeReservationState, rInfo *frameworkext.ReservationInfo, preemptible *framework.Resource) []string {
var insufficientResources []string

allowedPodNumber := nodeInfo.Allocatable.AllowedPodNumber
if len(nodeInfo.Pods)-len(nodeRState.matched)+1 > allowedPodNumber {
return false
insufficientResources = append(insufficientResources, string(corev1.ResourcePods))
}

if podRequest.MilliCPU == 0 &&
podRequest.Memory == 0 &&
podRequest.EphemeralStorage == 0 &&
len(podRequest.ScalarResources) == 0 {
return true
return insufficientResources
}

var rRemained *framework.Resource
Expand All @@ -457,22 +477,22 @@ func fitsNode(podRequest *framework.Resource, nodeInfo *framework.NodeInfo, node
}

if podRequest.MilliCPU > (nodeInfo.Allocatable.MilliCPU - (podRequested.MilliCPU - rRemained.MilliCPU - allRAllocated.MilliCPU - preemptible.MilliCPU)) {
return false
insufficientResources = append(insufficientResources, string(corev1.ResourceCPU))
}
if podRequest.Memory > (nodeInfo.Allocatable.Memory - (podRequested.Memory - rRemained.Memory - allRAllocated.Memory - preemptible.Memory)) {
return false
insufficientResources = append(insufficientResources, string(corev1.ResourceMemory))
}
if podRequest.EphemeralStorage > (nodeInfo.Allocatable.EphemeralStorage - (podRequested.EphemeralStorage - rRemained.EphemeralStorage - allRAllocated.EphemeralStorage - preemptible.EphemeralStorage)) {
return false
insufficientResources = append(insufficientResources, string(corev1.ResourceEphemeralStorage))
}

for rName, rQuant := range podRequest.ScalarResources {
if rQuant > (nodeInfo.Allocatable.ScalarResources[rName] - (podRequested.ScalarResources[rName] - rRemained.ScalarResources[rName] - allRAllocated.ScalarResources[rName] - preemptible.ScalarResources[rName])) {
return false
insufficientResources = append(insufficientResources, string(rName))
}
}

return true
return insufficientResources
}

func (pl *Plugin) PostFilter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, _ framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
Expand Down
21 changes: 15 additions & 6 deletions pkg/scheduler/plugins/reservation/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ func Test_filterWithReservations(t *testing.T) {
},
},
},
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNoReservationsMeetRequirements),
wantStatus: framework.NewStatus(framework.Unschedulable, "Insufficient cpu by node"),
},
{
name: "filter restricted reservation with nodeInfo",
Expand Down Expand Up @@ -861,7 +861,7 @@ func Test_filterWithReservations(t *testing.T) {
},
},
},
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNoReservationsMeetRequirements),
wantStatus: framework.NewStatus(framework.Unschedulable, "Insufficient cpu by reservation"),
},
{
name: "filter default reservations with preemption",
Expand Down Expand Up @@ -962,6 +962,9 @@ func Test_filterWithReservations(t *testing.T) {
name: "failed to filter default reservations with preempt from reservation",
stateData: &stateData{
hasAffinity: true,
podRequests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("4"),
},
podRequestsResources: &framework.Resource{
MilliCPU: 4 * 1000,
},
Expand Down Expand Up @@ -991,6 +994,7 @@ func Test_filterWithReservations(t *testing.T) {
AllocatePolicy: schedulingv1alpha1.ReservationAllocatePolicyDefault,
},
},
ResourceNames: []corev1.ResourceName{corev1.ResourceCPU},
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("6"),
},
Expand All @@ -1002,12 +1006,15 @@ func Test_filterWithReservations(t *testing.T) {
},
},
},
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNoReservationsMeetRequirements),
wantStatus: framework.NewStatus(framework.Unschedulable, "Insufficient cpu by node"),
},
{
name: "failed to filter default reservations with preempt from node",
stateData: &stateData{
hasAffinity: true,
podRequests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("4"),
},
podRequestsResources: &framework.Resource{
MilliCPU: 4 * 1000,
},
Expand Down Expand Up @@ -1035,6 +1042,7 @@ func Test_filterWithReservations(t *testing.T) {
AllocatePolicy: schedulingv1alpha1.ReservationAllocatePolicyDefault,
},
},
ResourceNames: []corev1.ResourceName{corev1.ResourceCPU},
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("6"),
},
Expand All @@ -1046,7 +1054,7 @@ func Test_filterWithReservations(t *testing.T) {
},
},
},
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNoReservationsMeetRequirements),
wantStatus: framework.NewStatus(framework.Unschedulable, "Insufficient cpu by node"),
},
{
name: "filter restricted reservations with preempt from reservation",
Expand Down Expand Up @@ -1131,6 +1139,7 @@ func Test_filterWithReservations(t *testing.T) {
AllocatePolicy: schedulingv1alpha1.ReservationAllocatePolicyRestricted,
},
},
ResourceNames: []corev1.ResourceName{corev1.ResourceCPU},
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("6"),
},
Expand All @@ -1142,7 +1151,7 @@ func Test_filterWithReservations(t *testing.T) {
},
},
},
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNoReservationsMeetRequirements),
wantStatus: framework.NewStatus(framework.Unschedulable, "Insufficient cpu by reservation"),
},
{
name: "failed to filter restricted reservations with preempt from reservation and node",
Expand Down Expand Up @@ -1199,7 +1208,7 @@ func Test_filterWithReservations(t *testing.T) {
},
},
},
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNoReservationsMeetRequirements),
wantStatus: framework.NewStatus(framework.Unschedulable, "Insufficient cpu by reservation"),
},
{
name: "filter restricted reservations with preempt from reservation and node",
Expand Down

0 comments on commit a85121a

Please sign in to comment.