Skip to content

Commit

Permalink
scheduler: fix reservation nominator residual bug
Browse files Browse the repository at this point in the history
Signed-off-by: xulinfei.xlf <xulinfei.xlf@alibaba-inc.com>
  • Loading branch information
xulinfei.xlf committed Apr 7, 2024
1 parent 2dc8735 commit 8f0cb83
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 7 deletions.
13 changes: 10 additions & 3 deletions pkg/scheduler/plugins/reservation/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
koordinatorinformers "github.com/koordinator-sh/koordinator/pkg/client/informers/externalversions"
Expand All @@ -29,12 +30,15 @@ import (
)

type reservationEventHandler struct {
cache *reservationCache
cache *reservationCache
rrNominator *nominator
}

func registerReservationEventHandler(cache *reservationCache, koordinatorInformerFactory koordinatorinformers.SharedInformerFactory) {
func registerReservationEventHandler(cache *reservationCache, koordinatorInformerFactory koordinatorinformers.SharedInformerFactory,
rrNominator *nominator) {
eventHandler := &reservationEventHandler{
cache: cache,
cache: cache,
rrNominator: rrNominator,
}
reservationInformer := koordinatorInformerFactory.Scheduling().V1alpha1().Reservations().Informer()
frameworkexthelper.ForceSyncFromInformer(context.TODO().Done(), koordinatorInformerFactory, reservationInformer, eventHandler)
Expand Down Expand Up @@ -63,6 +67,7 @@ func (h *reservationEventHandler) OnUpdate(oldObj, newObj interface{}) {

if reservationutil.IsReservationActive(newR) || reservationutil.IsReservationFailed(newR) || reservationutil.IsReservationSucceeded(newR) {
h.cache.updateReservation(newR)
h.rrNominator.DeleteReservePod(framework.NewPodInfo(reservationutil.NewReservePod(newR)))
klog.V(4).InfoS("update reservation into reservationCache", "reservation", klog.KObj(newR))
}
}
Expand All @@ -83,6 +88,8 @@ func (h *reservationEventHandler) OnDelete(obj interface{}) {
return
}

h.rrNominator.DeleteReservePod(framework.NewPodInfo(reservationutil.NewReservePod(r)))

// Here it is only marked that ReservationInfo is unavailable,
// and the real deletion operation is executed in deleteReservationFromCache(pkg/scheduler/frameworkext/eventhandlers/reservation_handler.go).
// This ensures that the Reserve Pod and the resources it holds are deleted correctly.
Expand Down
9 changes: 7 additions & 2 deletions pkg/scheduler/plugins/reservation/eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/kubernetes/pkg/scheduler/framework"

schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/util/reservation"
)

func TestEventHandlerOnAdd(t *testing.T) {
Expand Down Expand Up @@ -194,7 +196,7 @@ func TestEventHandlerUpdate(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := newReservationCache(nil)
eh := &reservationEventHandler{cache: cache}
eh := &reservationEventHandler{cache: cache, rrNominator: newNominator()}
eh.OnUpdate(tt.oldReservation, tt.newReservation)
if tt.wantReservation == nil {
rInfo := cache.getReservationInfoByUID(tt.newReservation.UID)
Expand Down Expand Up @@ -239,12 +241,15 @@ func TestEventHandlerDelete(t *testing.T) {
},
}
cache := newReservationCache(nil)
eh := &reservationEventHandler{cache: cache}
eh := &reservationEventHandler{cache: cache, rrNominator: newNominator()}
eh.OnAdd(activeReservation)
rInfo := cache.getReservationInfoByUID(activeReservation.UID)
assert.NotNil(t, rInfo)
eh.rrNominator.AddNominatedReservePod(framework.NewPodInfo(reservation.NewReservePod(activeReservation)), "test-node")
assert.Equal(t, []*framework.PodInfo{framework.NewPodInfo(reservation.NewReservePod(activeReservation))}, eh.rrNominator.NominatedReservePodForNode("test-node"))
eh.OnDelete(activeReservation)
rInfo = cache.getReservationInfoByUID(activeReservation.UID)
assert.NotNil(t, rInfo)
assert.False(t, rInfo.IsAvailable())
assert.Equal(t, []*framework.PodInfo{}, eh.rrNominator.NominatedReservePodForNode("test-node"))
}
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/reservation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error)
koordSharedInformerFactory := extendedHandle.KoordinatorSharedInformerFactory()
reservationLister := koordSharedInformerFactory.Scheduling().V1alpha1().Reservations().Lister()
cache := newReservationCache(reservationLister)
registerReservationEventHandler(cache, koordSharedInformerFactory)
nominator := newNominator()
registerReservationEventHandler(cache, koordSharedInformerFactory, nominator)
registerPodEventHandler(cache, nominator, sharedInformerFactory)

// TODO(joseph): Considering the amount of changed code,
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/plugins/reservation/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,8 @@ func (pl *Plugin) BeforeFilter(ctx context.Context, cycleState *framework.CycleS
if !status.IsSuccess() {
return pod, nodeInfo, false, status
}
klog.V(4).Infof("toschedule reservation: %s, added reservation: %s",
klog.V(4).Infof("nodeName: %s,toschedule reservation: %s, added reservation: %s",
nodeInfo.Node().Name,
reservationutil.GetReservationNameFromReservePod(pod),
reservationutil.GetReservationNameFromReservePod(rInfo.Pod))
}
Expand Down

0 comments on commit 8f0cb83

Please sign in to comment.