Skip to content

Commit

Permalink
Handle unschedulables pods and always start from reserved no matter w…
Browse files Browse the repository at this point in the history
…hat is placements

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed Oct 25, 2024
1 parent 143aa04 commit c6dfcf1
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 25 deletions.
73 changes: 58 additions & 15 deletions pkg/scheduler/statefulset/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,35 @@ var (

// Promote implements reconciler.LeaderAware.
func (s *StatefulSetScheduler) Promote(b reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error {
if !b.Has(ephemeralLeaderElectionObject) {
return nil
}

if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
return v.Promote(b, enq)
}
if err := s.initReserved(); err != nil {
return err
}
return nil
}

func (s *StatefulSetScheduler) initReserved() error {
s.reservedMu.Lock()
defer s.reservedMu.Unlock()

vPods, err := s.vpodLister()
if err != nil {
return fmt.Errorf("failed to list vPods during init: %w", err)
}

s.reserved = make(map[types.NamespacedName]map[string]int32, len(vPods))
for _, vPod := range vPods {
s.reserved[vPod.GetKey()] = make(map[string]int32, len(vPod.GetPlacements()))
for _, placement := range vPod.GetPlacements() {
s.reserved[vPod.GetKey()][placement.PodName] += placement.VReplicas
}
}
return nil
}

Expand Down Expand Up @@ -215,21 +241,27 @@ func (s *StatefulSetScheduler) scheduleVPod(ctx context.Context, vpod scheduler.
return nil, err
}

existingPlacements := vpod.GetPlacements()

reservedByPodName := make(map[string]int32, 2)
for k, v := range s.reserved {
if k.Namespace == vpod.GetKey().Namespace && k.Name == vpod.GetKey().Name {
// Skip adding reserved for our vpod.
continue
}
for _, v := range s.reserved {
for podName, vReplicas := range v {
v, _ := reservedByPodName[podName]

Check failure on line 247 in pkg/scheduler/statefulset/scheduler.go

View workflow job for this annotation

GitHub Actions / style / Golang / Lint

S1005: unnecessary assignment to the blank identifier (gosimple)
reservedByPodName[podName] = vReplicas + v
}
}

logger.Debugw("scheduling",
// Use reserved placements as starting point, if we have them.
existingPlacements := make([]duckv1alpha1.Placement, 0)
if placements, ok := s.reserved[vpod.GetKey()]; ok {
existingPlacements = make([]duckv1alpha1.Placement, 0, len(placements))
for podName, n := range placements {
existingPlacements = append(existingPlacements, duckv1alpha1.Placement{
PodName: podName,
VReplicas: n,
})
}
}

logger.Debugw("scheduling state",
zap.Any("state", state),
zap.Any("reservedByPodName", reservedByPodName),
zap.Any("reserved", st.ToJSONable(s.reserved)),
Expand All @@ -249,14 +281,13 @@ func (s *StatefulSetScheduler) scheduleVPod(ctx context.Context, vpod scheduler.
}

// Handle overcommitted pods.
f := state.Free(ordinal)
reserved, _ := reservedByPodName[p.PodName]

Check failure on line 284 in pkg/scheduler/statefulset/scheduler.go

View workflow job for this annotation

GitHub Actions / style / Golang / Lint

S1005: unnecessary assignment to the blank identifier (gosimple)
if f-reserved < 0 {
if state.Capacity-reserved < 0 {
// vr > free => vr: 9, overcommit 4 -> free: 0, vr: 5, pending: +4
// vr = free => vr: 4, overcommit 4 -> free: 0, vr: 0, pending: +4
// vr < free => vr: 3, overcommit 4 -> free: -1, vr: 0, pending: +3

overcommit := -(f - reserved)
overcommit := -(state.Capacity - reserved)

logger.Debugw("overcommit", zap.Any("overcommit", overcommit), zap.Any("placement", p))

Expand Down Expand Up @@ -374,7 +405,7 @@ func (s *StatefulSetScheduler) addReplicas(states *st.State, reservedByPodName m
ordinal := st.OrdinalFromPodName(podName)
reserved, _ := reservedByPodName[podName]

Check failure on line 406 in pkg/scheduler/statefulset/scheduler.go

View workflow job for this annotation

GitHub Actions / style / Golang / Lint

S1005: unnecessary assignment to the blank identifier (gosimple)
// Is there space?
if f := states.Free(ordinal); f-reserved > 0 {
if states.Capacity-reserved > 0 {
foundFreeCandidate = true
allocation := int32(1)

Expand All @@ -384,7 +415,7 @@ func (s *StatefulSetScheduler) addReplicas(states *st.State, reservedByPodName m
})

diff -= allocation
states.SetFree(ordinal, f-allocation)
reservedByPodName[podName] += allocation
}
}
}
Expand All @@ -403,7 +434,13 @@ func (s *StatefulSetScheduler) candidatesOrdered(states *st.State, vpod schedule
lastIdx := states.Replicas - 1

// De-prioritize existing placements pods, add existing placements to the tail of the candidates.
for _, placement := range placements {
// Start from the last one so that within the "existing replicas" group, we prioritize lower ordinals
// to reduce compaction.
for i := len(placements) - 1; i >= 0; i-- {
placement := placements[i]
if !states.IsSchedulablePod(st.OrdinalFromPodName(placement.PodName)) {
continue
}
// This should really never happen as placements are de-duped, however, better to handle
// edge cases in case the prerequisite doesn't hold in the future.
if existingPlacements.Has(placement.PodName) {
Expand All @@ -417,6 +454,9 @@ func (s *StatefulSetScheduler) candidatesOrdered(states *st.State, vpod schedule
// Prioritize reserved placements that don't appear in the committed placements.
if reserved, ok := s.reserved[vpod.GetKey()]; ok {
for podName := range reserved {
if !states.IsSchedulablePod(st.OrdinalFromPodName(podName)) {
continue
}
if existingPlacements.Has(podName) {
continue
}
Expand All @@ -429,6 +469,9 @@ func (s *StatefulSetScheduler) candidatesOrdered(states *st.State, vpod schedule
// Add all the ordinals to the candidates list.
// De-prioritize the last ordinals over lower ordinals so that we reduce the chances for compaction.
for ordinal := s.replicas - 1; ordinal >= 0; ordinal-- {
if !states.IsSchedulablePod(ordinal) {
continue
}
podName := st.PodNameFromOrdinal(states.StatefulSetName, ordinal)
if existingPlacements.Has(podName) {
continue
Expand Down Expand Up @@ -510,4 +553,4 @@ func upsertPlacements(placements []duckv1alpha1.Placement, placement duckv1alpha
placements = append(placements, placement)
}
return placements
}
}
53 changes: 43 additions & 10 deletions pkg/scheduler/statefulset/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
kubeclient "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake"
"knative.dev/pkg/controller"
"knative.dev/pkg/reconciler"

duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
listers "knative.dev/eventing/pkg/reconciler/testing/v1"
Expand Down Expand Up @@ -310,8 +311,7 @@ func TestStatefulsetScheduler(t *testing.T) {
vreplicas: 2,
replicas: int32(4),
expected: []duckv1alpha1.Placement{
{PodName: "statefulset-name-1", VReplicas: 1},
{PodName: "statefulset-name-3", VReplicas: 1},
{PodName: "statefulset-name-3", VReplicas: 2},
},
initialReserved: map[types.NamespacedName]map[string]int32{
types.NamespacedName{Namespace: vpodNamespace + "-a", Name: vpodName}: {
Expand All @@ -326,8 +326,7 @@ func TestStatefulsetScheduler(t *testing.T) {
"statefulset-name-0": 10,
},
types.NamespacedName{Namespace: vpodNamespace, Name: vpodName}: {
"statefulset-name-3": 1,
"statefulset-name-1": 1,
"statefulset-name-3": 2,
},
},
},
Expand Down Expand Up @@ -404,9 +403,9 @@ func TestStatefulsetScheduler(t *testing.T) {
{PodName: "statefulset-name-3", VReplicas: 1},
},
expected: []duckv1alpha1.Placement{
{PodName: "statefulset-name-1", VReplicas: 1},
{PodName: "statefulset-name-1", VReplicas: 2},
{PodName: "statefulset-name-2", VReplicas: 1},
{PodName: "statefulset-name-3", VReplicas: 2},
{PodName: "statefulset-name-3", VReplicas: 1},
},
initialReserved: map[types.NamespacedName]map[string]int32{
types.NamespacedName{Namespace: vpodNamespace + "-a", Name: vpodName}: {
Expand All @@ -422,9 +421,9 @@ func TestStatefulsetScheduler(t *testing.T) {
"statefulset-name-0": 10,
},
types.NamespacedName{Namespace: vpodNamespace, Name: vpodName}: {
"statefulset-name-1": 1,
"statefulset-name-1": 2,
"statefulset-name-2": 1,
"statefulset-name-3": 2,
"statefulset-name-3": 1,
},
},
},
Expand Down Expand Up @@ -462,7 +461,7 @@ func TestStatefulsetScheduler(t *testing.T) {
},
},
{
name: "issue",
name: "Scale one replica up with many existing placements",
vreplicas: 32,
replicas: int32(2),
placements: []duckv1alpha1.Placement{
Expand Down Expand Up @@ -527,6 +526,36 @@ func TestStatefulsetScheduler(t *testing.T) {
},
capacity: 20,
},
{
name: "Reserved inconsistent with placements",
vreplicas: 32,
replicas: int32(2),
placements: []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: 1},
},
expected: []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: 13},
{PodName: "statefulset-name-1", VReplicas: 19},
},
initialReserved: map[types.NamespacedName]map[string]int32{
types.NamespacedName{Namespace: vpodNamespace + "-a", Name: vpodName}: {
"statefulset-name-0": 7,
},
types.NamespacedName{Namespace: vpodNamespace, Name: vpodName}: {
"statefulset-name-0": 7,
},
},
expectedReserved: map[types.NamespacedName]map[string]int32{
types.NamespacedName{Namespace: vpodNamespace + "-a", Name: vpodName}: {
"statefulset-name-0": 7,
},
types.NamespacedName{Namespace: vpodNamespace, Name: vpodName}: {
"statefulset-name-0": 13,
"statefulset-name-1": 19,
},
},
capacity: 20,
},
}

for _, tc := range testCases {
Expand All @@ -547,7 +576,7 @@ func TestStatefulsetScheduler(t *testing.T) {
podlist = append(podlist, pod)
}

capacity := int32(0)
capacity := int32(10)
if tc.capacity > 0 {
capacity = tc.capacity
}
Expand All @@ -565,6 +594,10 @@ func TestStatefulsetScheduler(t *testing.T) {
VPodLister: vpodClient.List,
}
s := newStatefulSetScheduler(ctx, cfg, sa, nil)
err = s.Promote(reconciler.UniversalBucket(), func(bucket reconciler.Bucket, name types.NamespacedName) {})
if err != nil {
t.Fatal("unexpected error", err)
}
if tc.initialReserved != nil {
s.reserved = tc.initialReserved
}
Expand Down

0 comments on commit c6dfcf1

Please sign in to comment.