Skip to content

Commit

Permalink
scheduler: revise ReservationFilterPlugin
Browse files Browse the repository at this point in the history
Co-authored-by: shenxin <rougang.hrg@alibaba-inc.com>
Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube and shenxin committed Jan 7, 2025
1 parent b3ea5b6 commit bc4666d
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 6 deletions.
16 changes: 15 additions & 1 deletion pkg/scheduler/frameworkext/framework_extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,21 @@ func (ext *frameworkExtenderImpl) RunReservationExtensionFinalRestoreReservation
}

// RunReservationFilterPlugins determines whether the Reservation can participate in the Reserve
func (ext *frameworkExtenderImpl) RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status {
func (ext *frameworkExtenderImpl) RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status {
for _, pl := range ext.reservationFilterPlugins {
status := pl.FilterWithReservation(ctx, cycleState, pod, reservationInfo, nodeInfo)
if !status.IsSuccess() {
if debugFilterFailure {
klog.Infof("Failed to FilterWithReservation for Pod %q with Reservation %q on Node %q, failedPlugin: %s, reason: %s", klog.KObj(pod), klog.KObj(reservationInfo), nodeInfo.Node().Name, pl.Name(), status.Message())
}
return status
}
}
return nil
}

// RunNominateReservationFilterPlugins determines whether the Reservation can participate in the Reserve.
func (ext *frameworkExtenderImpl) RunNominateReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status {
for _, pl := range ext.reservationFilterPlugins {
status := pl.FilterReservation(ctx, cycleState, pod, reservationInfo, nodeName)
if !status.IsSuccess() {
Expand Down
95 changes: 93 additions & 2 deletions pkg/scheduler/frameworkext/framework_extender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,17 @@ type fakeReservationFilterPlugin struct {

func (f *fakeReservationFilterPlugin) Name() string { return "fakeReservationFilterPlugin" }

func (f *fakeReservationFilterPlugin) FilterWithReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if reservationInfo.Reservation.Annotations == nil {
reservationInfo.Reservation.Annotations = map[string]string{}
}
reservationInfo.Reservation.Annotations[fmt.Sprintf("reservationFilterWithPlugin-%d", f.index)] = fmt.Sprintf("%d", f.index)
if f.err != nil {
return framework.AsStatus(f.err)
}
return nil
}

func (f *fakeReservationFilterPlugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status {
if reservationInfo.Reservation.Annotations == nil {
reservationInfo.Reservation.Annotations = map[string]string{}
Expand All @@ -750,7 +761,87 @@ func (f *fakeReservationFilterPlugin) FilterReservation(ctx context.Context, cyc
return nil
}

func TestReservationFilterPlugin(t *testing.T) {
func TestRunReservationFilterPlugins(t *testing.T) {
testNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
},
}
testNodeInfo := framework.NewNodeInfo()
testNodeInfo.SetNode(testNode)
tests := []struct {
name string
reservation *schedulingv1alpha1.Reservation
plugins []*fakeReservationFilterPlugin
wantAnnotations map[string]string
wantStatus bool
}{
{
name: "filter reservation succeeded",
reservation: &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "test-reservation",
},
},
plugins: []*fakeReservationFilterPlugin{
{index: 1},
{index: 2},
},
wantAnnotations: map[string]string{
"reservationFilterWithPlugin-1": "1",
"reservationFilterWithPlugin-2": "2",
},
wantStatus: true,
},
{
name: "first plugin failed",
reservation: &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "test-reservation",
},
},
plugins: []*fakeReservationFilterPlugin{
{index: 1, err: errors.New("failed")},
{index: 2},
},
wantAnnotations: map[string]string{
"reservationFilterWithPlugin-1": "1",
},
wantStatus: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
registeredPlugins := []schedulertesting.RegisterPluginFunc{
schedulertesting.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
schedulertesting.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
}
fh, err := schedulertesting.NewFramework(
context.TODO(),
registeredPlugins,
"koord-scheduler",
)
assert.NoError(t, err)

extenderFactory, _ := NewFrameworkExtenderFactory()

extender := NewFrameworkExtender(extenderFactory, fh)
impl := extender.(*frameworkExtenderImpl)
for _, pl := range tt.plugins {
impl.updatePlugins(pl)
}

cycleState := framework.NewCycleState()

reservationInfo := NewReservationInfo(tt.reservation)
status := extender.RunReservationFilterPlugins(context.TODO(), cycleState, &corev1.Pod{}, reservationInfo, testNodeInfo)
assert.Equal(t, tt.wantStatus, status.IsSuccess())
assert.Equal(t, tt.wantAnnotations, reservationInfo.Reservation.Annotations)
})
}
}

func TestRunNominateReservationFilterPlugins(t *testing.T) {
tests := []struct {
name string
reservation *schedulingv1alpha1.Reservation
Expand Down Expand Up @@ -816,7 +907,7 @@ func TestReservationFilterPlugin(t *testing.T) {
cycleState := framework.NewCycleState()

reservationInfo := NewReservationInfo(tt.reservation)
status := extender.RunReservationFilterPlugins(context.TODO(), cycleState, &corev1.Pod{}, reservationInfo, "test-node-1")
status := extender.RunNominateReservationFilterPlugins(context.TODO(), cycleState, &corev1.Pod{}, reservationInfo, "test-node-1")
assert.Equal(t, tt.wantStatus, status.IsSuccess())
assert.Equal(t, tt.wantAnnotations, reservationInfo.Reservation.Annotations)
})
Expand Down
7 changes: 6 additions & 1 deletion pkg/scheduler/frameworkext/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ type FrameworkExtender interface {
// DEPRECATED: use RunReservationExtensionRestoreReservation instead.
RunReservationExtensionFinalRestoreReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, states PluginToNodeReservationRestoreStates) *framework.Status

RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status
RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status
RunNominateReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status
RunReservationScorePlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfos []*ReservationInfo, nodeName string) (PluginToReservationScores, *framework.Status)

RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) *framework.Status
Expand Down Expand Up @@ -121,8 +122,12 @@ type ReservationRestorePlugin interface {

// ReservationFilterPlugin is an interface for Filter Reservation plugins.
// These plugins will be called during the Reserve phase to determine whether the Reservation can participate in the Reserve
// FilterWithReservation runs in the Filter phase for pre-check a reservation.
// FilterReservation runs in the PreScore or Reserve phase to nominate a reservation.
// TODO: merge FilterReservation into FilterWithReservation.
type ReservationFilterPlugin interface {
framework.Plugin
FilterWithReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status
FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/plugins/deviceshare/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,10 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p
return status
}

func (p *Plugin) FilterWithReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status {
return nil
}

func (p *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status {
state, status := getPreFilterState(cycleState)
if !status.IsSuccess() {
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/plugins/nodenumaresource/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,10 @@ func (p *Plugin) filterAmplifiedCPUs(podRequestMilliCPU int64, nodeInfo *framewo
return nil
}

func (p *Plugin) FilterWithReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status {
return nil
}

func (p *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status {
state, status := getPreFilterState(cycleState)
if !status.IsSuccess() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/reservation/nominator.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (pl *Plugin) NominateReservation(ctx context.Context, cycleState *framework

reservations := make([]*frameworkext.ReservationInfo, 0, len(reservationInfos))
for i := range reservationInfos {
status := extender.RunReservationFilterPlugins(ctx, cycleState, pod, reservationInfos[i], nodeName)
status := extender.RunNominateReservationFilterPlugins(ctx, cycleState, pod, reservationInfos[i], nodeName)
if !status.IsSuccess() {
continue
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/scheduler/plugins/reservation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,11 @@ func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framew
return nil
}

extender, ok := pl.handle.(frameworkext.FrameworkExtender)
if !ok {
return framework.AsStatus(fmt.Errorf("not implemented frameworkext.FrameworkExtender"))
}

node := nodeInfo.Node()
state := getStateData(cycleState)
nodeRState := state.nodeReservationStates[node.Name]
Expand All @@ -474,6 +479,14 @@ func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framew
allInsufficientResourcesByNode := sets.NewString()
var allInsufficientResourceReasonsByReservation []string
for _, rInfo := range matchedReservations {
// Before nominating a reservation in PreScore or Reserve, check the reservation by multiple plugins to make
// the Filter phase give a more accurate result. It is extensible to support more policies.
status := extender.RunReservationFilterPlugins(ctx, cycleState, pod, rInfo, nodeInfo)
if !status.IsSuccess() {
allInsufficientResourceReasonsByReservation = append(allInsufficientResourceReasonsByReservation, status.Reasons()...)
continue
}

resourceNames := quotav1.Intersection(rInfo.ResourceNames, podRequestsResourceNames)
// NOTE: The reservation may not consider the irrelevant pods that have no matched resource names since it makes
// no sense in most cases but introduces a performance overhead. However, we allow pods to allocate reserved
Expand Down Expand Up @@ -781,6 +794,10 @@ func (pl *Plugin) makePostFilterReasons(state *stateData, filteredNodeStatusMap
return reasons
}

func (pl *Plugin) FilterWithReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status {
return nil
}

func (pl *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status {
// TODO(joseph): We can consider optimizing these codes. It seems that there is no need to exist at present.
state := getStateData(cycleState)
Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/plugins/reservation/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1907,7 +1907,11 @@ func Test_filterWithReservations(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pl := &Plugin{}
suit := newPluginTestSuit(t)
p, err := suit.pluginFactory()
assert.NoError(t, err)
pl := p.(*Plugin)
suit.start()
cycleState := framework.NewCycleState()
if tt.stateData.podRequestsResources == nil {
resources := framework.NewResource(tt.stateData.podRequests)
Expand Down

0 comments on commit bc4666d

Please sign in to comment.