Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: extend reservation nominator to support reservation preemption #1936

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions pkg/scheduler/frameworkext/eventhandlers/reservation_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,18 @@ func MakeReservationErrorHandler(
}

// if the pod is not a reserve pod, use the default error handler
if !reservationutil.IsReservePod(pod) {
// If the Pod failed to schedule or no post-filter plugins, should remove exist NominatedReservation of the Pod.
if _, ok := schedulingErr.(*framework.FitError); !ok || !fwk.HasPostFilterPlugins() {
extendedHandle := fwk.(frameworkext.ExtendedHandle)
extendedHandle.GetReservationNominator().RemoveNominatedReservations(pod)
// If the Pod failed to schedule or no post-filter plugins, should remove exist NominatedReservation of the Pod.
if _, ok := schedulingErr.(*framework.FitError); !ok || !fwk.HasPostFilterPlugins() {
if extendedHandle, ok := fwk.(frameworkext.ExtendedHandle); ok {
if !reservationutil.IsReservePod(pod) {
extendedHandle.GetReservationNominator().RemoveNominatedReservations(pod)
} else {
extendedHandle.GetReservationNominator().DeleteNominatedReservePod(pod)
}
}
}

if !reservationutil.IsReservePod(pod) {
return false
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/frameworkext/framework_extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (ext *frameworkExtenderImpl) RunPostFilterPlugins(ctx context.Context, stat
result, status := ext.Framework.RunPostFilterPlugins(ctx, state, pod, filteredNodeStatusMap)
if result == nil || result.NominatingInfo.NominatedNodeName == "" {
ext.GetReservationNominator().RemoveNominatedReservations(pod)
ext.GetReservationNominator().DeleteNominatedReservePod(pod)
}
return result, status
}
Expand Down Expand Up @@ -461,6 +462,7 @@ func (ext *frameworkExtenderImpl) RunReservePluginsReserve(ctx context.Context,
}
status := ext.Framework.RunReservePluginsReserve(ctx, cycleState, pod, nodeName)
ext.GetReservationNominator().RemoveNominatedReservations(pod)
ext.GetReservationNominator().DeleteNominatedReservePod(pod)
return status
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/frameworkext/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ type ReservationNominator interface {
AddNominatedReservation(pod *corev1.Pod, nodeName string, rInfo *ReservationInfo)
RemoveNominatedReservations(pod *corev1.Pod)
GetNominatedReservation(pod *corev1.Pod, nodeName string) *ReservationInfo
AddNominatedReservePod(reservePod *corev1.Pod, nodeName string)
DeleteNominatedReservePod(reservePod *corev1.Pod)
}

const (
Expand Down
48 changes: 47 additions & 1 deletion pkg/scheduler/frameworkext/testing/fake_reservation_nominator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
Expand All @@ -34,7 +35,10 @@ type FakeNominator struct {
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is nominated.
nominatedPodToNode map[types.UID]map[string]types.UID
reservations map[types.UID]*frameworkext.ReservationInfo
lock sync.RWMutex
// nominatedReservePod is map keyed by nodeName, value is the nominated reservations
nominatedReservePod map[string][]*framework.PodInfo
nominatedReservePodToNode map[types.UID]string
lock sync.RWMutex
}

func NewFakeReservationNominator() *FakeNominator {
Expand Down Expand Up @@ -87,3 +91,45 @@ func (nm *FakeNominator) NominateReservation(ctx context.Context, cycleState *fr
rInfo := nm.GetNominatedReservation(pod, nodeName)
return rInfo, nil
}

func (nm *FakeNominator) AddNominatedReservePod(rInfo *corev1.Pod, nodeName string) {
nm.lock.Lock()
defer nm.lock.Unlock()

// Always delete the reservation if it already exists, to ensure we never store more than
// one instance of the reservation.
nm.deleteReservePod(rInfo)

nm.nominatedReservePodToNode[rInfo.UID] = nodeName
for _, npi := range nm.nominatedReservePod[nodeName] {
if npi.Pod.UID == rInfo.UID {
klog.V(4).InfoS("reservation already exists in the nominator", "pod", klog.KObj(npi.Pod))
return
}
}
nm.nominatedReservePod[nodeName] = append(nm.nominatedReservePod[nodeName], framework.NewPodInfo(rInfo))
}

func (nm *FakeNominator) DeleteNominatedReservePod(rInfo *corev1.Pod) {
nm.lock.Lock()
defer nm.lock.Unlock()

nm.deleteReservePod(rInfo)
}

func (nm *FakeNominator) deleteReservePod(rInfo *corev1.Pod) {
nnn, ok := nm.nominatedReservePodToNode[rInfo.UID]
if !ok {
return
}
for i, np := range nm.nominatedReservePod[nnn] {
if np.Pod.UID == rInfo.UID {
nm.nominatedReservePod[nnn] = append(nm.nominatedReservePod[nnn][:i], nm.nominatedReservePod[nnn][i+1:]...)
if len(nm.nominatedReservePod[nnn]) == 0 {
delete(nm.nominatedReservePod, nnn)
}
break
}
}
delete(nm.nominatedReservePodToNode, rInfo.UID)
}
70 changes: 68 additions & 2 deletions pkg/scheduler/plugins/reservation/nominator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,17 @@ import (
type nominator struct {
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is nominated.
nominatedPodToNode map[types.UID]map[string]types.UID
lock sync.RWMutex
// nominatedReservePod is map keyed by nodeName, value is the nominated reservation's PodInfo
nominatedReservePod map[string][]*framework.PodInfo
nominatedReservePodToNode map[types.UID]string
lock sync.RWMutex
}

func newNominator() *nominator {
return &nominator{
nominatedPodToNode: map[types.UID]map[string]types.UID{},
nominatedPodToNode: map[types.UID]map[string]types.UID{},
nominatedReservePod: map[string][]*framework.PodInfo{},
nominatedReservePodToNode: map[types.UID]string{},
}
}

Expand All @@ -58,6 +63,59 @@ func (nm *nominator) AddNominatedReservation(pod *corev1.Pod, nodeName string, r
nodeToReservation[nodeName] = rInfo.UID()
}

func (nm *nominator) AddNominatedReservePod(pi *framework.PodInfo, nodeName string) {
nm.lock.Lock()
defer nm.lock.Unlock()

// Always delete the reservation if it already exists, to ensure we never store more than
// one instance of the reservation.
nm.deleteReservePod(pi)

nm.nominatedReservePodToNode[pi.Pod.UID] = nodeName
for _, npi := range nm.nominatedReservePod[nodeName] {
if npi.Pod.UID == pi.Pod.UID {
klog.V(4).InfoS("reservation already exists in the nominator", "pod", klog.KObj(npi.Pod))
return
}
}
nm.nominatedReservePod[nodeName] = append(nm.nominatedReservePod[nodeName], pi)
}

func (nm *nominator) NominatedReservePodForNode(nodeName string) []*framework.PodInfo {
nm.lock.RLock()
defer nm.lock.RUnlock()
// Make a copy of the nominated Pods so the caller can mutate safely.
reservePods := make([]*framework.PodInfo, len(nm.nominatedReservePod[nodeName]))
for i := 0; i < len(reservePods); i++ {
reservePods[i] = nm.nominatedReservePod[nodeName][i].DeepCopy()
}
return reservePods
}

func (nm *nominator) DeleteReservePod(pi *framework.PodInfo) {
nm.lock.Lock()
defer nm.lock.Unlock()

nm.deleteReservePod(pi)
}

func (nm *nominator) deleteReservePod(pi *framework.PodInfo) {
nnn, ok := nm.nominatedReservePodToNode[pi.Pod.UID]
if !ok {
return
}
for i, np := range nm.nominatedReservePod[nnn] {
if np.Pod.UID == pi.Pod.UID {
nm.nominatedReservePod[nnn] = append(nm.nominatedReservePod[nnn][:i], nm.nominatedReservePod[nnn][i+1:]...)
if len(nm.nominatedReservePod[nnn]) == 0 {
delete(nm.nominatedReservePod, nnn)
}
break
}
}
delete(nm.nominatedReservePodToNode, pi.Pod.UID)
}

func (nm *nominator) RemoveNominatedReservation(pod *corev1.Pod) {
nm.lock.Lock()
defer nm.lock.Unlock()
Expand Down Expand Up @@ -141,6 +199,14 @@ func (pl *Plugin) RemoveNominatedReservations(pod *corev1.Pod) {
pl.nominator.RemoveNominatedReservation(pod)
}

func (pl *Plugin) AddNominatedReservePod(pod *corev1.Pod, nodeName string) {
pl.nominator.AddNominatedReservePod(framework.NewPodInfo(pod), nodeName)
}

func (pl *Plugin) DeleteNominatedReservePod(pod *corev1.Pod) {
pl.nominator.DeleteReservePod(framework.NewPodInfo(pod))
}

func (pl *Plugin) GetNominatedReservation(pod *corev1.Pod, nodeName string) *frameworkext.ReservationInfo {
reservationID := pl.nominator.GetNominatedReservation(pod, nodeName)
if reservationID == "" {
Expand Down
55 changes: 55 additions & 0 deletions pkg/scheduler/plugins/reservation/nominator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,58 @@ func TestMultiReservationsOnSameNode(t *testing.T) {
assert.Equal(t, 1, v)
}
}

func TestReservationsNominator(t *testing.T) {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
Status: corev1.NodeStatus{
Allocatable: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("96"),
corev1.ResourceMemory: resource.MustParse("1886495404Ki"),
},
},
}

resourceList := corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("16"),
corev1.ResourceMemory: resource.MustParse("32Gi"),
}
labels := map[string]string{
"foo": "bar",
}
suit := newPluginTestSuitWith(t, nil, []*corev1.Node{node})
var pods []*corev1.Pod
for i := 0; i < 3; i++ {
r := newTestReservation(t, fmt.Sprintf("test-r-%d", i), labels, labels, node.Name, resourceList)
pods = append(pods, reservationutil.NewReservePod(r))
_, err := suit.extenderFactory.KoordinatorClientSet().SchedulingV1alpha1().Reservations().Create(context.TODO(), r, metav1.CreateOptions{})
assert.NoError(t, err)
}
nodeInfo, err := suit.fw.SnapshotSharedLister().NodeInfos().Get(node.Name)
assert.NoError(t, err)
assert.Equal(t, 0, len(nodeInfo.Pods))

p, err := suit.pluginFactory()
assert.NoError(t, err)
pl := p.(*Plugin)

nominatorImpl := pl.handle.(frameworkext.FrameworkExtender).GetReservationNominator()

nominatorImpl.AddNominatedReservePod(pods[0], "node-1")
ctx := context.TODO()
state := framework.NewCycleState()
pod, nodeInfoOut, update, status := pl.BeforeFilter(ctx, state, pods[2], nodeInfo)
assert.Equal(t, pod, pods[2])
assert.True(t, update)
assert.True(t, status.IsSuccess())
assert.Equal(t, 1, len(nodeInfoOut.Pods))

nominatorImpl.AddNominatedReservePod(pods[1], "node-1")
pod, nodeInfoOut, update, status = pl.BeforeFilter(ctx, state, pods[2], nodeInfo)
assert.Equal(t, pod, pods[2])
assert.True(t, update)
assert.True(t, status.IsSuccess())
assert.Equal(t, 2, len(nodeInfoOut.Pods))
}
3 changes: 3 additions & 0 deletions pkg/scheduler/plugins/reservation/pod_eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
frameworkexthelper "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext/helper"
Expand Down Expand Up @@ -95,6 +96,7 @@ func (h *podEventHandler) updatePod(oldPod, newPod *corev1.Pod) {
}

h.nominator.RemoveNominatedReservation(newPod)
h.nominator.DeleteReservePod(framework.NewPodInfo(newPod))

var reservationUID types.UID
if oldPod != nil {
Expand Down Expand Up @@ -128,6 +130,7 @@ func (h *podEventHandler) updatePod(oldPod, newPod *corev1.Pod) {

func (h *podEventHandler) deletePod(pod *corev1.Pod) {
h.nominator.RemoveNominatedReservation(pod)
h.nominator.DeleteReservePod(framework.NewPodInfo(pod))

reservationAllocated, err := apiext.GetReservationAllocated(pod)
if err == nil && reservationAllocated != nil && reservationAllocated.UID != "" {
Expand Down
48 changes: 48 additions & 0 deletions pkg/scheduler/plugins/reservation/pod_eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/kubernetes/pkg/scheduler/framework"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
Expand Down Expand Up @@ -75,20 +76,38 @@ func TestPodEventHandler(t *testing.T) {
},
}

handler.nominator.AddNominatedReservePod(framework.NewPodInfo(pod), "test-node-1")
assert.Equal(t, "test-node-1", handler.nominator.nominatedReservePodToNode[pod.UID])
assert.Equal(t, []*framework.PodInfo{
framework.NewPodInfo(pod),
}, handler.nominator.nominatedReservePod["test-node-1"])
handler.OnAdd(pod)
rInfo := handler.cache.getReservationInfoByUID(reservationUID)
assert.Empty(t, rInfo.AssignedPods)
// pod not assigned, no need to delete reservation nominated node
assert.Equal(t, "test-node-1", handler.nominator.nominatedReservePodToNode[pod.UID])
assert.Equal(t, []*framework.PodInfo{
framework.NewPodInfo(pod),
}, handler.nominator.nominatedReservePod["test-node-1"])

newPod := pod.DeepCopy()
apiext.SetReservationAllocated(newPod, reservation)
handler.OnUpdate(pod, newPod)
rInfo = handler.cache.getReservationInfoByUID(reservationUID)
assert.Len(t, rInfo.AssignedPods, 0)
// pod not assigned, no need to delete reservation nominated node
assert.Equal(t, "test-node-1", handler.nominator.nominatedReservePodToNode[pod.UID])
assert.Equal(t, []*framework.PodInfo{
framework.NewPodInfo(pod),
}, handler.nominator.nominatedReservePod["test-node-1"])

newPod.Spec.NodeName = reservation.Status.NodeName
handler.OnUpdate(pod, newPod)
rInfo = handler.cache.getReservationInfoByUID(reservationUID)
assert.Len(t, rInfo.AssignedPods, 1)
// pod assigned, delete reservation nominated node
assert.Equal(t, "", handler.nominator.nominatedReservePodToNode[pod.UID])
assert.Equal(t, []*framework.PodInfo(nil), handler.nominator.nominatedReservePod["test-node-1"])

expectPodRequirement := &frameworkext.PodRequirement{
Name: pod.Name,
Expand All @@ -100,9 +119,38 @@ func TestPodEventHandler(t *testing.T) {
}
assert.Equal(t, expectPodRequirement, rInfo.AssignedPods[pod.UID])

handler.nominator.nominatedReservePod["test-node-1"] = []*framework.PodInfo{
framework.NewPodInfo(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-1",
UID: "test-1",
},
}),
}
handler.nominator.AddNominatedReservePod(framework.NewPodInfo(newPod), "test-node-1")
assert.Equal(t, "test-node-1", handler.nominator.nominatedReservePodToNode[newPod.UID])
assert.Equal(t, []*framework.PodInfo{
framework.NewPodInfo(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-1",
UID: "test-1",
},
}),
framework.NewPodInfo(newPod),
}, handler.nominator.nominatedReservePod["test-node-1"])

handler.OnDelete(newPod)
rInfo = handler.cache.getReservationInfoByUID(reservationUID)
assert.Empty(t, rInfo.AssignedPods)
assert.Equal(t, "", handler.nominator.nominatedReservePodToNode[newPod.UID])
assert.Equal(t, []*framework.PodInfo{
framework.NewPodInfo(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-1",
UID: "test-1",
},
}),
}, handler.nominator.nominatedReservePod["test-node-1"])
}

func TestPodEventHandlerWithOperatingPod(t *testing.T) {
Expand Down
Loading
Loading