Skip to content

Commit

Permalink
scheduler: optimize scheduleCycle logic when notEnoughChildren
Browse files Browse the repository at this point in the history
Signed-off-by: wangjianyu.wjy <wangjianyu.wjy@alibaba-inc.com>
  • Loading branch information
wangjianyu.wjy committed Apr 18, 2024
1 parent 211250b commit 42d0009
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 60 deletions.
65 changes: 46 additions & 19 deletions pkg/scheduler/plugins/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ const (
PodGroupNotFound Status = "PodGroup not found"
Success Status = "Success"
Wait Status = "Wait"

stateKey = "CoScheduling"
)

// Manager defines the interfaces for PodGroup management.
type Manager interface {
PreFilter(context.Context, *corev1.Pod) error
PreFilter(context.Context, *framework.CycleState, *corev1.Pod) (err error)
Permit(context.Context, *corev1.Pod) (time.Duration, Status)
PostBind(context.Context, *corev1.Pod, string)
PostFilter(context.Context, *corev1.Pod, framework.Handle, string, framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status)
PostFilter(context.Context, *framework.CycleState, *corev1.Pod, framework.Handle, string, framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status)
GetCreatTime(*framework.QueuedPodInfo) time.Time
GetGangGroupId(*corev1.Pod) (string, error)
GetAllPodsFromGang(string) []*corev1.Pod
Expand Down Expand Up @@ -221,32 +223,28 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework
}
}

type ScheduleCycleInValidError struct {
ErrMsg string
}

func (err *ScheduleCycleInValidError) Error() string {
return err.ErrMsg
}

// PreFilter
// i.Check whether children in Gang has met the requirements of minimum number under each Gang, and reject the pod if negative.
// ii.Check whether the Gang is inited, and reject the pod if positive.
// iii.Check whether the Gang is OnceResourceSatisfied
// iv.Check whether the Gang has met the scheduleCycleValid check, and reject the pod if negative(only Strict mode ).
// v.Try update scheduleCycle, scheduleCycleValid, childrenScheduleRoundMap as mentioned above.
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error {
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, state *framework.CycleState, pod *corev1.Pod) (err error) {
if !util.IsPodNeedGang(pod) {
return nil
}
preFilterState := &stateData{skipReject: false, skipSetCycleInvalid: false}
state.Write(stateKey, preFilterState)
gang := pgMgr.GetGangByPod(pod)
if gang == nil {
preFilterState.skipSetCycleInvalid = true
return fmt.Errorf("can't find gang, gangName: %v, podName: %v", util.GetId(pod.Namespace, util.GetGangNameByPod(pod)),
util.GetId(pod.Namespace, pod.Name))
}

// check if gang is initialized
if !gang.HasGangInit {
preFilterState.skipSetCycleInvalid = true
return fmt.Errorf("gang has not init, gangName: %v, podName: %v", gang.Name,
util.GetId(pod.Namespace, pod.Name))
}
Expand All @@ -257,6 +255,7 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er

// check minNum
if gang.getChildrenNum() < gang.getGangMinNum() {
preFilterState.skipSetCycleInvalid = true
return fmt.Errorf("gang child pod not collect enough, gangName: %v, podName: %v", gang.Name,
util.GetId(pod.Namespace, pod.Name))
}
Expand All @@ -279,10 +278,9 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er
}
podScheduleCycle := gang.getChildScheduleCycle(pod)
if !gang.isScheduleCycleValid() {
err := &ScheduleCycleInValidError{}
err.ErrMsg = fmt.Sprintf("gang scheduleCycle not valid, gangName: %v, podName: %v",
preFilterState.skipReject = true
return fmt.Errorf("gang scheduleCycle not valid, gangName: %v, podName: %v",
gang.Name, util.GetId(pod.Namespace, pod.Name))
return err
}
if podScheduleCycle >= gangScheduleCycle {
return fmt.Errorf("pod's schedule cycle too large, gangName: %v, podName: %v, podCycle: %v, gangCycle: %v",
Expand All @@ -292,10 +290,32 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er
return nil
}

type stateData struct {
skipReject bool
skipSetCycleInvalid bool
}

func (s *stateData) Clone() framework.StateData {
ns := &stateData{
skipReject: s.skipReject,
skipSetCycleInvalid: s.skipSetCycleInvalid,
}
return ns
}

func getPreFilterState(stateKey string, cycleState *framework.CycleState) *stateData {
value, err := cycleState.Read(framework.StateKey(stateKey))
if err != nil {
return nil
}
state := value.(*stateData)
return state
}

// PostFilter
// i. If strict-mode, we will set scheduleCycleValid to false and release all assumed pods.
// ii. If non-strict mode, we will do nothing.
func (pgMgr *PodGroupManager) PostFilter(ctx context.Context, pod *corev1.Pod, handle framework.Handle, pluginName string, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
func (pgMgr *PodGroupManager) PostFilter(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, handle framework.Handle, pluginName string, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
if !util.IsPodNeedGang(pod) {
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
}
Expand All @@ -310,6 +330,10 @@ func (pgMgr *PodGroupManager) PostFilter(ctx context.Context, pod *corev1.Pod, h
}

if gang.getGangMode() == extension.GangModeStrict {
preFilterState := getPreFilterState(stateKey, state)
if preFilterState != nil && preFilterState.skipReject {
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
}
nodeInfos, _ := handle.SnapshotSharedLister().NodeInfos().List()
fitErr := &framework.FitError{
Pod: pod,
Expand All @@ -319,7 +343,8 @@ func (pgMgr *PodGroupManager) PostFilter(ctx context.Context, pod *corev1.Pod, h
},
}
message := fmt.Sprintf("Gang %q gets rejected due to member Pod %q is unschedulable with reason %q", gang.Name, pod.Name, fitErr)
pgMgr.rejectGangGroupById(handle, pluginName, gang.Name, message)

pgMgr.rejectGangGroupById(handle, preFilterState != nil && preFilterState.skipSetCycleInvalid, pluginName, gang.Name, message)
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable,
fmt.Sprintf("Gang %q gets rejected due to pod is unschedulable", gang.Name))
}
Expand Down Expand Up @@ -377,11 +402,11 @@ func (pgMgr *PodGroupManager) Unreserve(ctx context.Context, state *framework.Cy
if !(gang.getGangMatchPolicy() == extension.GangMatchPolicyOnceSatisfied && gang.isGangOnceResourceSatisfied()) &&
gang.getGangMode() == extension.GangModeStrict {
message := fmt.Sprintf("Gang %q gets rejected due to Pod %q in Unreserve", gang.Name, pod.Name)
pgMgr.rejectGangGroupById(handle, pluginName, gang.Name, message)
pgMgr.rejectGangGroupById(handle, false, pluginName, gang.Name, message)
}
}

func (pgMgr *PodGroupManager) rejectGangGroupById(handle framework.Handle, pluginName, gangId, message string) {
func (pgMgr *PodGroupManager) rejectGangGroupById(handle framework.Handle, skipSetCycleInvalid bool, pluginName, gangId, message string) {
gang := pgMgr.cache.getGangFromCacheByGangId(gangId, false)
if gang == nil {
return
Expand All @@ -401,7 +426,9 @@ func (pgMgr *PodGroupManager) rejectGangGroupById(handle framework.Handle, plugi
}
})
}

if skipSetCycleInvalid {
return
}
for gang := range gangSet {
gangIns := pgMgr.cache.getGangFromCacheByGangId(gang, false)
if gangIns != nil {
Expand Down
33 changes: 27 additions & 6 deletions pkg/scheduler/plugins/coscheduling/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/retry"
"k8s.io/kubernetes/pkg/scheduler/framework"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/v1alpha1"
fakepgclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned/fake"
Expand Down Expand Up @@ -115,25 +116,25 @@ func TestPlugin_PreFilter_ResetScheduleTime(t *testing.T) {
assert.Equal(t, lastScheduleTime1, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod1"])
assert.Equal(t, lastScheduleTime1, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod2"])

mgr.PreFilter(context.TODO(), pod1)
mgr.PreFilter(context.TODO(), framework.NewCycleState(), pod1)
lastScheduleTime2 := gang.GangGroupInfo.LastScheduleTime
assert.Equal(t, 2, len(gang.GangGroupInfo.ChildrenLastScheduleTime))
assert.Equal(t, lastScheduleTime2, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod1"])
assert.Equal(t, lastScheduleTime1, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod2"])

mgr.PreFilter(context.TODO(), pod1)
mgr.PreFilter(context.TODO(), framework.NewCycleState(), pod1)
lastScheduleTime2 = gang.GangGroupInfo.LastScheduleTime
assert.Equal(t, 2, len(gang.GangGroupInfo.ChildrenLastScheduleTime))
assert.Equal(t, lastScheduleTime2, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod1"])
assert.Equal(t, lastScheduleTime1, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod2"])

mgr.PreFilter(context.TODO(), pod2)
mgr.PreFilter(context.TODO(), framework.NewCycleState(), pod2)
lastScheduleTime2 = gang.GangGroupInfo.LastScheduleTime
assert.Equal(t, 2, len(gang.GangGroupInfo.ChildrenLastScheduleTime))
assert.Equal(t, lastScheduleTime2, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod1"])
assert.Equal(t, lastScheduleTime2, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod2"])

mgr.PreFilter(context.TODO(), pod2)
mgr.PreFilter(context.TODO(), framework.NewCycleState(), pod2)
lastScheduleTime3 := gang.GangGroupInfo.LastScheduleTime
assert.Equal(t, 2, len(gang.GangGroupInfo.ChildrenLastScheduleTime))
assert.Equal(t, lastScheduleTime2, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod1"])
Expand All @@ -156,6 +157,7 @@ func TestPlugin_PreFilter(t *testing.T) {
expectedChildCycleMap map[string]int
expectedScheduleCycle int
expectedScheduleCycleValid bool
expectStateData *stateData
// case value
// next two are set before pg created
totalNum int
Expand All @@ -181,6 +183,9 @@ func TestPlugin_PreFilter(t *testing.T) {
},
expectedScheduleCycleValid: true,
expectedScheduleCycle: 1,
expectStateData: &stateData{
skipSetCycleInvalid: true,
},
},
{
name: "gang ResourceSatisfied",
Expand All @@ -190,6 +195,7 @@ func TestPlugin_PreFilter(t *testing.T) {
expectedScheduleCycleValid: true,
expectedScheduleCycle: 1,
resourceSatisfied: true,
expectStateData: &stateData{},
},
{
name: "pod count less than minMember",
Expand All @@ -202,6 +208,9 @@ func TestPlugin_PreFilter(t *testing.T) {
expectedScheduleCycle: 1,
expectedChildCycleMap: map[string]int{},
expectedScheduleCycleValid: true,
expectStateData: &stateData{
skipSetCycleInvalid: true,
},
},
{
name: "pods count equal with minMember,but is NonStrictMode",
Expand All @@ -214,6 +223,7 @@ func TestPlugin_PreFilter(t *testing.T) {
pgs: makePg("gangb", "gangb_ns", 4, &gangACreatedTime, nil),
expectedErrorMessage: "",
isNonStrictMode: true,
expectStateData: &stateData{},
},
{
name: "due to reschedule pod6's podScheduleCycle is equal with the gangScheduleCycle",
Expand All @@ -232,6 +242,7 @@ func TestPlugin_PreFilter(t *testing.T) {
},
expectedErrorMessage: "pod's schedule cycle too large, gangName: ganga_ns/gangc, podName: ganga_ns/pod6, podCycle: 1, gangCycle: 1",
expectedScheduleCycleValid: true,
expectStateData: &stateData{},
},
{
name: "due to reschedule pod6's podScheduleCycle is equal with the gangScheduleCycle, but pod6's nominatedNodeName is not empty",
Expand All @@ -254,6 +265,7 @@ func TestPlugin_PreFilter(t *testing.T) {
},
expectedErrorMessage: "",
expectedScheduleCycleValid: true,
expectStateData: &stateData{},
},
{
name: "pods count equal with minMember,is StrictMode,but the gang's scheduleCycle is not valid due to pre pod Filter Failed",
Expand All @@ -271,6 +283,9 @@ func TestPlugin_PreFilter(t *testing.T) {
expectedScheduleCycleValid: false,
expectedErrorMessage: "gang scheduleCycle not valid, gangName: ganga_ns/gangd, podName: ganga_ns/pod7",
shouldSetValidToFalse: true,
expectStateData: &stateData{
skipReject: true,
},
},
{
name: "pods count equal with minMember,is StrictMode, disable check scheduleCycle even if the gang's scheduleCycle is not valid",
Expand All @@ -289,6 +304,7 @@ func TestPlugin_PreFilter(t *testing.T) {
expectedErrorMessage: "",
shouldSetValidToFalse: true,
shouldSkipCheckScheduleCycle: true,
expectStateData: &stateData{},
},
{
name: "pods count equal with minMember,is StrictMode,scheduleCycle valid,but childrenNum is not reach to total num",
Expand All @@ -306,6 +322,7 @@ func TestPlugin_PreFilter(t *testing.T) {
},
expectedScheduleCycleValid: true,
expectedErrorMessage: "",
expectStateData: &stateData{},
},
{
name: "pods count more than minMember,is StrictMode,scheduleCycle valid,and childrenNum reach to total num",
Expand All @@ -327,6 +344,7 @@ func TestPlugin_PreFilter(t *testing.T) {
},
expectedErrorMessage: "",
expectedScheduleCycleValid: true,
expectStateData: &stateData{},
},
}
for _, tt := range tests {
Expand All @@ -352,7 +370,7 @@ func TestPlugin_PreFilter(t *testing.T) {
// add each neighbor pods and run preFilter
for _, pod := range tt.pods {
mgr.cache.onPodAdd(pod)
mgr.PreFilter(ctx, pod)
mgr.PreFilter(ctx, framework.NewCycleState(), pod)
}
mgr.cache.onPodAdd(tt.pod)

Expand All @@ -373,13 +391,16 @@ func TestPlugin_PreFilter(t *testing.T) {
}()
}
// run the case
err := mgr.PreFilter(ctx, tt.pod)
cycleState := framework.NewCycleState()
err := mgr.PreFilter(ctx, cycleState, tt.pod)
var returnMessage string
if err == nil {
returnMessage = ""
} else {
returnMessage = err.Error()
}
preFilterState := getPreFilterState(stateKey, cycleState)
assert.Equal(t, tt.expectStateData, preFilterState)
// assert
assert.Equal(t, tt.expectedErrorMessage, returnMessage)
if gang != nil && !tt.isNonStrictMode && !tt.shouldSkipCheckScheduleCycle {
Expand Down
39 changes: 4 additions & 35 deletions pkg/scheduler/plugins/coscheduling/coscheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ var _ framework.EnqueueExtensions = &Coscheduling{}

const (
// Name is the name of the plugin used in Registry and configurations.
Name = "Coscheduling"
stateKey = Name
Name = "Coscheduling"
)

// New initializes and returns a new Coscheduling plugin.
Expand Down Expand Up @@ -168,49 +167,19 @@ func (cs *Coscheduling) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool {
// iii.Check whether the Gang has met the scheduleCycleValid check, and reject the pod if negative.
// iv.Try update scheduleCycle, scheduleCycleValid, childrenScheduleRoundMap as mentioned above.
func (cs *Coscheduling) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
// If PreFilter fails, return framework.Error to avoid
// any preemption attempts.
if err := cs.pgMgr.PreFilter(ctx, pod); err != nil {
// If Prefilter failed due to scheduleCycle invalid, we shouldn't reject it's assumed sibling.
if _, ok := err.(*core.ScheduleCycleInValidError); ok {
state.Write(stateKey, &stateData{skipPostFilter: true})
}

// If PreFilter fails, return framework.UnschedulableAndUnresolvable to avoid any preemption attempts.
if err := cs.pgMgr.PreFilter(ctx, state, pod); err != nil {
klog.ErrorS(err, "PreFilter failed", "pod", klog.KObj(pod))
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
return nil, framework.NewStatus(framework.Success, "")
}

type stateData struct {
skipPostFilter bool
}

func (s *stateData) Clone() framework.StateData {
ns := &stateData{
skipPostFilter: s.skipPostFilter,
}
return ns
}

func getPreFilterState(cycleState *framework.CycleState) *stateData {
value, err := cycleState.Read(stateKey)
if err != nil {
return nil
}
state := value.(*stateData)
return state
}

// PostFilter
// i. If strict-mode, we will set scheduleCycleValid to false and release all assumed pods.
// ii. If non-strict mode, we will do nothing.
func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
preFilterState := getPreFilterState(state)
if preFilterState != nil && preFilterState.skipPostFilter {
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
}
return cs.pgMgr.PostFilter(ctx, pod, cs.frameworkHandler, Name, filteredNodeStatusMap)
return cs.pgMgr.PostFilter(ctx, state, pod, cs.frameworkHandler, Name, filteredNodeStatusMap)
}

// PreFilterExtensions returns a PreFilterExtensions interface if the plugin implements one.
Expand Down

0 comments on commit 42d0009

Please sign in to comment.