Skip to content

Commit

Permalink
scheduler: revise ReservationFilterPlugin
Browse files Browse the repository at this point in the history
Co-authored-by: shenxin <rougang.hrg@alibaba-inc.com>
Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube and shenxin committed Jan 9, 2025
1 parent 1e0be0b commit df3d309
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 29 deletions.
18 changes: 16 additions & 2 deletions pkg/scheduler/frameworkext/framework_extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,23 @@ func (ext *frameworkExtenderImpl) RunReservationExtensionFinalRestoreReservation
}

// RunReservationFilterPlugins determines whether the Reservation can participate in the Reserve
func (ext *frameworkExtenderImpl) RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status {
func (ext *frameworkExtenderImpl) RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status {
for _, pl := range ext.reservationFilterPlugins {
status := pl.FilterReservation(ctx, cycleState, pod, reservationInfo, nodeName)
status := pl.FilterReservation(ctx, cycleState, pod, reservationInfo, nodeInfo)
if !status.IsSuccess() {
if debugFilterFailure {
klog.Infof("Failed to FilterWithReservation for Pod %q with Reservation %q on Node %q, failedPlugin: %s, reason: %s", klog.KObj(pod), klog.KObj(reservationInfo), nodeInfo.Node().Name, pl.Name(), status.Message())
}
return status
}
}
return nil
}

// RunNominateReservationFilterPlugins determines whether the Reservation can participate in the Reserve.
func (ext *frameworkExtenderImpl) RunNominateReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status {
for _, pl := range ext.reservationFilterPlugins {
status := pl.FilterNominateReservation(ctx, cycleState, pod, reservationInfo, nodeName)
if !status.IsSuccess() {
if debugFilterFailure {
klog.Infof("Failed to FilterReservation for Pod %q with Reservation %q on Node %q, failedPlugin: %s, reason: %s", klog.KObj(pod), klog.KObj(reservationInfo), nodeName, pl.Name(), status.Message())
Expand Down
97 changes: 94 additions & 3 deletions pkg/scheduler/frameworkext/framework_extender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,18 @@ type fakeReservationFilterPlugin struct {

func (f *fakeReservationFilterPlugin) Name() string { return "fakeReservationFilterPlugin" }

func (f *fakeReservationFilterPlugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status {
func (f *fakeReservationFilterPlugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if reservationInfo.Reservation.Annotations == nil {
reservationInfo.Reservation.Annotations = map[string]string{}
}
reservationInfo.Reservation.Annotations[fmt.Sprintf("reservationFilterWithPlugin-%d", f.index)] = fmt.Sprintf("%d", f.index)
if f.err != nil {
return framework.AsStatus(f.err)
}
return nil
}

func (f *fakeReservationFilterPlugin) FilterNominateReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status {
if reservationInfo.Reservation.Annotations == nil {
reservationInfo.Reservation.Annotations = map[string]string{}
}
Expand All @@ -750,7 +761,87 @@ func (f *fakeReservationFilterPlugin) FilterReservation(ctx context.Context, cyc
return nil
}

func TestReservationFilterPlugin(t *testing.T) {
func TestRunReservationFilterPlugins(t *testing.T) {
testNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
},
}
testNodeInfo := framework.NewNodeInfo()
testNodeInfo.SetNode(testNode)
tests := []struct {
name string
reservation *schedulingv1alpha1.Reservation
plugins []*fakeReservationFilterPlugin
wantAnnotations map[string]string
wantStatus bool
}{
{
name: "filter reservation succeeded",
reservation: &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "test-reservation",
},
},
plugins: []*fakeReservationFilterPlugin{
{index: 1},
{index: 2},
},
wantAnnotations: map[string]string{
"reservationFilterWithPlugin-1": "1",
"reservationFilterWithPlugin-2": "2",
},
wantStatus: true,
},
{
name: "first plugin failed",
reservation: &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "test-reservation",
},
},
plugins: []*fakeReservationFilterPlugin{
{index: 1, err: errors.New("failed")},
{index: 2},
},
wantAnnotations: map[string]string{
"reservationFilterWithPlugin-1": "1",
},
wantStatus: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
registeredPlugins := []schedulertesting.RegisterPluginFunc{
schedulertesting.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
schedulertesting.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
}
fh, err := schedulertesting.NewFramework(
context.TODO(),
registeredPlugins,
"koord-scheduler",
)
assert.NoError(t, err)

extenderFactory, _ := NewFrameworkExtenderFactory()

extender := NewFrameworkExtender(extenderFactory, fh)
impl := extender.(*frameworkExtenderImpl)
for _, pl := range tt.plugins {
impl.updatePlugins(pl)
}

cycleState := framework.NewCycleState()

reservationInfo := NewReservationInfo(tt.reservation)
status := extender.RunReservationFilterPlugins(context.TODO(), cycleState, &corev1.Pod{}, reservationInfo, testNodeInfo)
assert.Equal(t, tt.wantStatus, status.IsSuccess())
assert.Equal(t, tt.wantAnnotations, reservationInfo.Reservation.Annotations)
})
}
}

func TestRunNominateReservationFilterPlugins(t *testing.T) {
tests := []struct {
name string
reservation *schedulingv1alpha1.Reservation
Expand Down Expand Up @@ -816,7 +907,7 @@ func TestReservationFilterPlugin(t *testing.T) {
cycleState := framework.NewCycleState()

reservationInfo := NewReservationInfo(tt.reservation)
status := extender.RunReservationFilterPlugins(context.TODO(), cycleState, &corev1.Pod{}, reservationInfo, "test-node-1")
status := extender.RunNominateReservationFilterPlugins(context.TODO(), cycleState, &corev1.Pod{}, reservationInfo, "test-node-1")
assert.Equal(t, tt.wantStatus, status.IsSuccess())
assert.Equal(t, tt.wantAnnotations, reservationInfo.Reservation.Annotations)
})
Expand Down
11 changes: 8 additions & 3 deletions pkg/scheduler/frameworkext/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ type FrameworkExtender interface {
// DEPRECATED: use RunReservationExtensionRestoreReservation instead.
RunReservationExtensionFinalRestoreReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, states PluginToNodeReservationRestoreStates) *framework.Status

RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status
RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status
RunNominateReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status
RunReservationScorePlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfos []*ReservationInfo, nodeName string) (PluginToReservationScores, *framework.Status)

RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) *framework.Status
Expand Down Expand Up @@ -120,10 +121,14 @@ type ReservationRestorePlugin interface {
}

// ReservationFilterPlugin is an interface for Filter Reservation plugins.
// These plugins will be called during the Reserve phase to determine whether the Reservation can participate in the Reserve
// FilterReservation will be called in the Filter phase for determining which reservations are available.
// FilterNominateReservation will be called in the PreScore or the Reserve phase to nominate a reservation whether it
// can participate the Reserve.
// TODO: Looking forward a merged method.
type ReservationFilterPlugin interface {
framework.Plugin
FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status
FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status
FilterNominateReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status
}

// ReservationNominator nominates a more suitable Reservation in the Reserve stage and Pod will bind this Reservation.
Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/plugins/deviceshare/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,11 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p
return status
}

func (p *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status {
func (p *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status {
return nil
}

func (p *Plugin) FilterNominateReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status {
state, status := getPreFilterState(cycleState)
if !status.IsSuccess() {
return status
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/plugins/deviceshare/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2194,7 +2194,7 @@ func Test_Plugin_Filter(t *testing.T) {
}
}

func Test_Plugin_FilterReservation(t *testing.T) {
func Test_Plugin_FilterNominateReservation(t *testing.T) {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
Expand Down Expand Up @@ -2294,7 +2294,7 @@ func Test_Plugin_FilterReservation(t *testing.T) {
})
assert.True(t, status.IsSuccess())

status = pl.FilterReservation(context.TODO(), cycleState, pod, reservationInfo, "test-node-1")
status = pl.FilterNominateReservation(context.TODO(), cycleState, pod, reservationInfo, "test-node-1")
assert.True(t, status.IsSuccess())

allocatedPod := &corev1.Pod{
Expand Down Expand Up @@ -2334,7 +2334,7 @@ func Test_Plugin_FilterReservation(t *testing.T) {
})
assert.True(t, status.IsSuccess())

status = pl.FilterReservation(context.TODO(), cycleState, pod, reservationInfo, "test-node-1")
status = pl.FilterNominateReservation(context.TODO(), cycleState, pod, reservationInfo, "test-node-1")
assert.Equal(t, framework.NewStatus(framework.Unschedulable, "Reservation(s) Insufficient gpu devices"), status)
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/plugins/nodenumaresource/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,11 @@ func (p *Plugin) filterAmplifiedCPUs(podRequestMilliCPU int64, nodeInfo *framewo
return nil
}

func (p *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status {
func (p *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status {
return nil
}

func (p *Plugin) FilterNominateReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status {
state, status := getPreFilterState(cycleState)
if !status.IsSuccess() {
return status
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/plugins/nodenumaresource/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ func TestFilterWithAmplifiedCPUs(t *testing.T) {
}
}

func TestPlugin_FilterReservation(t *testing.T) {
func TestPlugin_FilterNominateReservation(t *testing.T) {
skipState := framework.NewCycleState()
skipState.Write(stateKey, &preFilterState{
skip: true,
Expand Down Expand Up @@ -1077,7 +1077,7 @@ func TestPlugin_FilterReservation(t *testing.T) {
assert.NotNil(t, p)
assert.Nil(t, err)
pl := p.(*Plugin)
got := pl.FilterReservation(context.TODO(), tt.args.cycleState, tt.args.pod, tt.args.reservationInfo, tt.args.nodeName)
got := pl.FilterNominateReservation(context.TODO(), tt.args.cycleState, tt.args.pod, tt.args.reservationInfo, tt.args.nodeName)
assert.Equal(t, tt.want, got)
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/reservation/nominator.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (pl *Plugin) NominateReservation(ctx context.Context, cycleState *framework

reservations := make([]*frameworkext.ReservationInfo, 0, len(reservationInfos))
for i := range reservationInfos {
status := extender.RunReservationFilterPlugins(ctx, cycleState, pod, reservationInfos[i], nodeName)
status := extender.RunNominateReservationFilterPlugins(ctx, cycleState, pod, reservationInfos[i], nodeName)
if !status.IsSuccess() {
continue
}
Expand Down
39 changes: 29 additions & 10 deletions pkg/scheduler/plugins/reservation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,11 @@ func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framew
return nil
}

extender, ok := pl.handle.(frameworkext.FrameworkExtender)
if !ok {
return framework.AsStatus(fmt.Errorf("not implemented frameworkext.FrameworkExtender"))
}

node := nodeInfo.Node()
state := getStateData(cycleState)
nodeRState := state.nodeReservationStates[node.Name]
Expand Down Expand Up @@ -494,22 +499,32 @@ func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framew
insufficientResourcesByNode := fitsNode(state.podRequestsResources, nodeInfo, nodeRState, rInfo, preemptible)
state.preemptLock.RUnlock()

nodeFits := len(insufficientResourcesByNode) == 0
nodeFits := len(insufficientResourcesByNode) <= 0
allInsufficientResourcesByNode.Insert(insufficientResourcesByNode...)

reservationFits := false
allocatePolicy := rInfo.GetAllocatePolicy()
if allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyDefault ||
allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyAligned {
if nodeFits {
return nil
}
allInsufficientResourcesByNode.Insert(insufficientResourcesByNode...)
reservationFits = nodeFits
} else if allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyRestricted {
insufficientResourceReasonsByReservation := fitsReservation(state.podRequests, rInfo, preemptibleInRR, requireDetailReasons)
if nodeFits && len(insufficientResourceReasonsByReservation) <= 0 { // fit the reservation
return nil
}
allInsufficientResourcesByNode.Insert(insufficientResourcesByNode...)

reservationFits = len(insufficientResourceReasonsByReservation) <= 0
allInsufficientResourceReasonsByReservation = append(allInsufficientResourceReasonsByReservation, insufficientResourceReasonsByReservation...)
}

// Before nominating a reservation in PreScore or Reserve, check the reservation by multiple plugins to make
// the Filter phase give a more accurate result. It is extensible to support more policies.
status := extender.RunReservationFilterPlugins(ctx, cycleState, pod, rInfo, nodeInfo)
if !status.IsSuccess() {
allInsufficientResourceReasonsByReservation = append(allInsufficientResourceReasonsByReservation, status.Reasons()...)
continue
}

if nodeFits && reservationFits {
return nil
}
}

// The Pod requirement must be allocated from Reservation, but currently no Reservation meets the requirement.
Expand Down Expand Up @@ -781,7 +796,11 @@ func (pl *Plugin) makePostFilterReasons(state *stateData, filteredNodeStatusMap
return reasons
}

func (pl *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status {
func (pl *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status {
return nil
}

func (pl *Plugin) FilterNominateReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status {
// TODO(joseph): We can consider optimizing these codes. It seems that there is no need to exist at present.
state := getStateData(cycleState)

Expand Down
10 changes: 7 additions & 3 deletions pkg/scheduler/plugins/reservation/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1907,7 +1907,11 @@ func Test_filterWithReservations(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pl := &Plugin{}
suit := newPluginTestSuit(t)
p, err := suit.pluginFactory()
assert.NoError(t, err)
pl := p.(*Plugin)
suit.start()
cycleState := framework.NewCycleState()
if tt.stateData.podRequestsResources == nil {
resources := framework.NewResource(tt.stateData.podRequests)
Expand Down Expand Up @@ -2224,7 +2228,7 @@ func TestPreFilterExtensionRemovePod(t *testing.T) {
}
}

func TestFilterReservation(t *testing.T) {
func TestFilterNominateReservation(t *testing.T) {
reservation4C8G := &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
UID: uuid.NewUUID(),
Expand Down Expand Up @@ -2415,7 +2419,7 @@ func TestFilterReservation(t *testing.T) {
cycleState.Write(stateKey, state)

rInfo := frameworkext.NewReservationInfo(tt.targetReservation)
status := pl.FilterReservation(context.TODO(), cycleState, pod, rInfo, node.Name)
status := pl.FilterNominateReservation(context.TODO(), cycleState, pod, rInfo, node.Name)
assert.Equal(t, tt.wantStatus, status)
})
}
Expand Down

0 comments on commit df3d309

Please sign in to comment.