Skip to content

Commit

Permalink
Add benchmark + reduce OrdinalFromPodName calls
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed Oct 29, 2024
1 parent 39f8718 commit 55d1ac5
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 11 deletions.
3 changes: 1 addition & 2 deletions pkg/scheduler/state/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package state

import (
"math"
"strconv"
"strings"

Expand All @@ -33,7 +32,7 @@ func PodNameFromOrdinal(name string, ordinal int32) string {
func OrdinalFromPodName(podName string) int32 {
ordinal, err := strconv.ParseInt(podName[strings.LastIndex(podName, "-")+1:], 10, 32)
if err != nil {
return math.MaxInt32
panic(podName + " is not a valid pod name")
}
return int32(ordinal)
}
Expand Down
21 changes: 13 additions & 8 deletions pkg/scheduler/statefulset/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ func (s *StatefulSetScheduler) scheduleVPod(ctx context.Context, vpod scheduler.
}
}

sort.SliceStable(existingPlacements, func(i int, j int) bool {
return st.OrdinalFromPodName(existingPlacements[i].PodName) < st.OrdinalFromPodName(existingPlacements[j].PodName)
})

logger.Debugw("scheduling state",
zap.Any("state", state),
zap.Any("reservedByPodName", reservedByPodName),
Expand Down Expand Up @@ -399,12 +403,12 @@ func (s *StatefulSetScheduler) addReplicas(states *st.State, reservedByPodName m
foundFreeCandidate := true
for diff > 0 && foundFreeCandidate {
foundFreeCandidate = false
for _, podName := range candidates {
for _, ordinal := range candidates {
if diff <= 0 {
break
}

ordinal := st.OrdinalFromPodName(podName)
podName := st.PodNameFromOrdinal(states.StatefulSetName, ordinal)
reserved, _ := reservedByPodName[podName]

Check failure on line 412 in pkg/scheduler/statefulset/scheduler.go

View workflow job for this annotation

GitHub Actions / style / Golang / Lint

S1005: unnecessary assignment to the blank identifier (gosimple)
// Is there space?
if states.Capacity-reserved > 0 {
Expand All @@ -428,9 +432,9 @@ func (s *StatefulSetScheduler) addReplicas(states *st.State, reservedByPodName m
return newPlacements, diff
}

func (s *StatefulSetScheduler) candidatesOrdered(states *st.State, vpod scheduler.VPod, placements []duckv1alpha1.Placement) []string {
func (s *StatefulSetScheduler) candidatesOrdered(states *st.State, vpod scheduler.VPod, placements []duckv1alpha1.Placement) []int32 {
existingPlacements := sets.New[string]()
candidates := make([]string, states.Replicas)
candidates := make([]int32, states.Replicas)

firstIdx := 0
lastIdx := states.Replicas - 1
Expand All @@ -440,15 +444,16 @@ func (s *StatefulSetScheduler) candidatesOrdered(states *st.State, vpod schedule
// to reduce compaction.
for i := len(placements) - 1; i >= 0; i-- {
placement := placements[i]
if !states.IsSchedulablePod(st.OrdinalFromPodName(placement.PodName)) {
ordinal := st.OrdinalFromPodName(placement.PodName)
if !states.IsSchedulablePod(ordinal) {
continue
}
// This should really never happen as placements are de-duped, however, better to handle
// edge cases in case the prerequisite doesn't hold in the future.
if existingPlacements.Has(placement.PodName) {
continue
}
candidates[lastIdx] = placement.PodName
candidates[lastIdx] = ordinal
lastIdx--
existingPlacements.Insert(placement.PodName)
}
Expand All @@ -462,7 +467,7 @@ func (s *StatefulSetScheduler) candidatesOrdered(states *st.State, vpod schedule
if existingPlacements.Has(podName) {
continue
}
candidates[firstIdx] = podName
candidates[firstIdx] = st.OrdinalFromPodName(podName)
firstIdx++
existingPlacements.Insert(podName)
}
Expand All @@ -478,7 +483,7 @@ func (s *StatefulSetScheduler) candidatesOrdered(states *st.State, vpod schedule
if existingPlacements.Has(podName) {
continue
}
candidates[lastIdx] = podName
candidates[lastIdx] = ordinal
lastIdx--
}
return candidates
Expand Down
47 changes: 47 additions & 0 deletions pkg/scheduler/statefulset/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package statefulset
import (
"context"
"fmt"
"math/rand"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -718,3 +719,49 @@ func TestStatefulsetScheduler(t *testing.T) {
})
}
}

func BenchmarkSchedule(b *testing.B) {
ctx, _ := tscheduler.SetupFakeContext(b)

_, err := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs).Create(ctx, tscheduler.MakeStatefulset(testNs, sfsName, 10000), metav1.CreateOptions{})
if err != nil {
b.Fatal("unexpected error", err)
}

vpodClient := tscheduler.NewVPodClient()
for i := 0; i < 1000; i++ {
vpodClient.Create(vpodNamespace, vpodName, rand.Int31n(100), nil)
}
k8s := kubeclient.Get(ctx).CoreV1()

podlist := make([]runtime.Object, 0)
for i := int32(0); i < 10000; i++ {
nodeName := "node" + fmt.Sprint(i)
podName := sfsName + "-" + fmt.Sprint(i)
pod, err := k8s.Pods(testNs).Create(ctx, tscheduler.MakePod(testNs, podName, nodeName), metav1.CreateOptions{})
if err != nil {
b.Fatal("unexpected error", err)
}
podlist = append(podlist, pod)
}
lsp := listers.NewListers(podlist)
scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5})
sa := state.NewStateBuilder(sfsName, vpodClient.List, 20, lsp.GetPodLister().Pods(testNs), scaleCache)
cfg := &Config{
StatefulSetNamespace: testNs,
StatefulSetName: sfsName,
VPodLister: vpodClient.List,
}
s := newStatefulSetScheduler(ctx, cfg, sa, nil)
err = s.Promote(reconciler.UniversalBucket(), func(bucket reconciler.Bucket, name types.NamespacedName) {})
if err != nil {
b.Fatal("unexpected error", err)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
if _, err := s.Schedule(ctx, vpodClient.Random()); err != nil {
b.Fatal("unexpected error", err)
}
}
}
9 changes: 9 additions & 0 deletions pkg/scheduler/testing/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package testing

import (
"math/rand"

duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
"knative.dev/eventing/pkg/scheduler"
)
Expand Down Expand Up @@ -51,3 +53,10 @@ func (s *VPodClient) Append(vpod scheduler.VPod) {
func (s *VPodClient) List() ([]scheduler.VPod, error) {
return s.lister()
}

func (s *VPodClient) Random() scheduler.VPod {
s.store.lock.Lock()
defer s.store.lock.Unlock()

return s.store.vpods[rand.Intn(len(s.store.vpods))]
}
2 changes: 1 addition & 1 deletion pkg/scheduler/testing/vpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func MakePod(ns, name, nodename string) *v1.Pod {
return obj
}

func SetupFakeContext(t *testing.T) (context.Context, context.CancelFunc) {
func SetupFakeContext(t testing.TB) (context.Context, context.CancelFunc) {
ctx, cancel, informers := rectesting.SetupFakeContextWithCancel(t)
err := controller.StartInformers(ctx.Done(), informers...)
if err != nil {
Expand Down

0 comments on commit 55d1ac5

Please sign in to comment.