Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache statefulset scale update/get requests #7651

Merged
merged 4 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@ limitations under the License.
package scheduler

import (
"context"
"sync"
"time"

autoscalingv1 "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/cache"
clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"

duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
kubeclient "knative.dev/pkg/client/injection/kube/client"
)

type SchedulerPolicyType string
Expand Down Expand Up @@ -114,3 +123,83 @@ type VPod interface {

GetResourceVersion() string
}

type ScaleCache struct {
lock sync.RWMutex //protects access to entries, entries itself is concurrency safe, so we only need to ensure that we correctly access the pointer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's be specific on what this lock is associated with, usually <name> + Mu

Suggested change
lock sync.RWMutex //protects access to entries, entries itself is concurrency safe, so we only need to ensure that we correctly access the pointer
entriesMu sync.RWMutex // protects access to entries, entries itself is concurrency safe, so we only need to ensure that we correctly access the pointer

entries *cache.Expiring
statefulSetClient clientappsv1.StatefulSetInterface
statefulSetNamespace string
Cali0707 marked this conversation as resolved.
Show resolved Hide resolved
}

type scaleEntry struct {
specReplicas int32
statReplicas int32
}

func NewScaleCache(ctx context.Context, namespace string) *ScaleCache {
Cali0707 marked this conversation as resolved.
Show resolved Hide resolved
return &ScaleCache{
entries: cache.NewExpiring(),
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(namespace),
statefulSetNamespace: namespace,
}
}

func (sc *ScaleCache) GetScale(ctx context.Context, statefulSetName string, options metav1.GetOptions) (*autoscalingv1.Scale, error) {
sc.lock.RLock()
defer sc.lock.RUnlock()

if entry, ok := sc.entries.Get(statefulSetName); ok {
entry := entry.(scaleEntry)
return &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: statefulSetName,
Namespace: sc.statefulSetNamespace,
},
Spec: autoscalingv1.ScaleSpec{
Replicas: entry.specReplicas,
},
Status: autoscalingv1.ScaleStatus{
Replicas: entry.statReplicas,
},
}, nil
}

scale, err := sc.statefulSetClient.GetScale(ctx, statefulSetName, options)
if err != nil {
return scale, err
}

sc.setScale(statefulSetName, scale)

return scale, nil
Cali0707 marked this conversation as resolved.
Show resolved Hide resolved
}

func (sc *ScaleCache) UpdateScale(ctx context.Context, statefulSetName string, scale *autoscalingv1.Scale, opts metav1.UpdateOptions) (*autoscalingv1.Scale, error) {
sc.lock.RLock()
defer sc.lock.RUnlock()

updatedScale, err := sc.statefulSetClient.UpdateScale(ctx, statefulSetName, scale, opts)
if err != nil {
return updatedScale, err
}

sc.setScale(statefulSetName, updatedScale)

return updatedScale, nil
Cali0707 marked this conversation as resolved.
Show resolved Hide resolved
}

func (sc *ScaleCache) Reset() {
sc.lock.Lock()
defer sc.lock.Unlock()

sc.entries = cache.NewExpiring()
}

func (sc *ScaleCache) setScale(name string, scale *autoscalingv1.Scale) {
entry := scaleEntry{
specReplicas: scale.Spec.Replicas,
statReplicas: scale.Status.Replicas,
}

sc.entries.Set(name, entry, time.Minute*5)
}
50 changes: 24 additions & 26 deletions pkg/scheduler/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
corev1 "k8s.io/client-go/listers/core/v1"

kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/scheduler"
Expand Down Expand Up @@ -154,34 +152,34 @@ func (s *State) IsSchedulablePod(ordinal int32) bool {

// stateBuilder reconstruct the state from scratch, by listing vpods
type stateBuilder struct {
ctx context.Context
logger *zap.SugaredLogger
vpodLister scheduler.VPodLister
capacity int32
schedulerPolicy scheduler.SchedulerPolicyType
nodeLister corev1.NodeLister
statefulSetClient clientappsv1.StatefulSetInterface
statefulSetName string
podLister corev1.PodNamespaceLister
schedPolicy *scheduler.SchedulerPolicy
deschedPolicy *scheduler.SchedulerPolicy
ctx context.Context
logger *zap.SugaredLogger
vpodLister scheduler.VPodLister
capacity int32
schedulerPolicy scheduler.SchedulerPolicyType
nodeLister corev1.NodeLister
statefulSetCache *scheduler.ScaleCache
statefulSetName string
podLister corev1.PodNamespaceLister
schedPolicy *scheduler.SchedulerPolicy
deschedPolicy *scheduler.SchedulerPolicy
}

// NewStateBuilder returns a StateAccessor recreating the state from scratch each time it is requested
func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy *scheduler.SchedulerPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister) StateAccessor {
func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy *scheduler.SchedulerPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor {

return &stateBuilder{
ctx: ctx,
logger: logging.FromContext(ctx),
vpodLister: lister,
capacity: podCapacity,
schedulerPolicy: schedulerPolicy,
nodeLister: nodeLister,
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(namespace),
statefulSetName: sfsname,
podLister: podlister,
schedPolicy: schedPolicy,
deschedPolicy: deschedPolicy,
ctx: ctx,
logger: logging.FromContext(ctx),
vpodLister: lister,
capacity: podCapacity,
schedulerPolicy: schedulerPolicy,
nodeLister: nodeLister,
statefulSetCache: statefulSetCache,
statefulSetName: sfsname,
podLister: podlister,
schedPolicy: schedPolicy,
deschedPolicy: deschedPolicy,
}
}

Expand All @@ -191,7 +189,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
return nil, err
}

scale, err := s.statefulSetClient.GetScale(s.ctx, s.statefulSetName, metav1.GetOptions{})
scale, err := s.statefulSetCache.GetScale(s.ctx, s.statefulSetName, metav1.GetOptions{})
if err != nil {
s.logger.Infow("failed to get statefulset", zap.Error(err))
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,9 @@ func TestStateBuilder(t *testing.T) {
lsp := listers.NewListers(podlist)
lsn := listers.NewListers(nodelist)

stateBuilder := NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, int32(10), tc.schedulerPolicyType, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister())
scaleCache := scheduler.NewScaleCache(ctx, testNs)

stateBuilder := NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, int32(10), tc.schedulerPolicyType, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache)
state, err := stateBuilder.State(tc.reserved)
if err != nil {
t.Fatal("unexpected error", err)
Expand Down
48 changes: 24 additions & 24 deletions pkg/scheduler/statefulset/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
"knative.dev/pkg/reconciler"

kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/scheduler"
Expand All @@ -57,13 +55,13 @@ type Autoscaler interface {
}

type autoscaler struct {
statefulSetClient clientappsv1.StatefulSetInterface
statefulSetName string
vpodLister scheduler.VPodLister
logger *zap.SugaredLogger
stateAccessor st.StateAccessor
trigger chan struct{}
evictor scheduler.Evictor
statefulSetCache *scheduler.ScaleCache
statefulSetName string
vpodLister scheduler.VPodLister
logger *zap.SugaredLogger
stateAccessor st.StateAccessor
trigger chan struct{}
evictor scheduler.Evictor

// capacity is the total number of virtual replicas available per pod.
capacity int32
Expand Down Expand Up @@ -92,6 +90,8 @@ func (a *autoscaler) Promote(b reconciler.Bucket, _ func(reconciler.Bucket, type
if b.Has(ephemeralLeaderElectionObject) {
// The promoted bucket has the ephemeralLeaderElectionObject, so we are leader.
a.isLeader.Store(true)
// reset the cache to be empty so that when we access state as the leader it is always the newest values
a.statefulSetCache.Reset()
}
return nil
}
Expand All @@ -104,20 +104,20 @@ func (a *autoscaler) Demote(b reconciler.Bucket) {
}
}

func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor) *autoscaler {
func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor, statefulSetCache *scheduler.ScaleCache) *autoscaler {
return &autoscaler{
logger: logging.FromContext(ctx).With(zap.String("component", "autoscaler")),
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace),
statefulSetName: cfg.StatefulSetName,
vpodLister: cfg.VPodLister,
stateAccessor: stateAccessor,
evictor: cfg.Evictor,
trigger: make(chan struct{}, 1),
capacity: cfg.PodCapacity,
refreshPeriod: cfg.RefreshPeriod,
lock: new(sync.Mutex),
isLeader: atomic.Bool{},
getReserved: cfg.getReserved,
logger: logging.FromContext(ctx).With(zap.String("component", "autoscaler")),
statefulSetCache: statefulSetCache,
statefulSetName: cfg.StatefulSetName,
vpodLister: cfg.VPodLister,
stateAccessor: stateAccessor,
evictor: cfg.Evictor,
trigger: make(chan struct{}, 1),
capacity: cfg.PodCapacity,
refreshPeriod: cfg.RefreshPeriod,
lock: new(sync.Mutex),
isLeader: atomic.Bool{},
getReserved: cfg.getReserved,
// Anything that is less than now() - refreshPeriod, so that we will try to compact
// as soon as we start.
lastCompactAttempt: time.Now().
Expand Down Expand Up @@ -183,7 +183,7 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err
return err
}

scale, err := a.statefulSetClient.GetScale(ctx, a.statefulSetName, metav1.GetOptions{})
scale, err := a.statefulSetCache.GetScale(ctx, a.statefulSetName, metav1.GetOptions{})
if err != nil {
// skip a beat
a.logger.Infow("failed to get scale subresource", zap.Error(err))
Expand Down Expand Up @@ -236,7 +236,7 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err
scale.Spec.Replicas = newreplicas
a.logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas))

_, err = a.statefulSetClient.UpdateScale(ctx, a.statefulSetName, scale, metav1.UpdateOptions{})
_, err = a.statefulSetCache.UpdateScale(ctx, a.statefulSetName, scale, metav1.UpdateOptions{})
if err != nil {
a.logger.Errorw("updating scale subresource failed", zap.Error(err))
return err
Expand Down
16 changes: 10 additions & 6 deletions pkg/scheduler/statefulset/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,9 @@ func TestAutoscaler(t *testing.T) {
lsnn = lsn.GetNodeLister()
}

stateAccessor := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lspp, lsnn)
scaleCache := scheduler.NewScaleCache(ctx, testNs)

stateAccessor := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lspp, lsnn, scaleCache)

sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs)
_, err := sfsClient.Create(ctx, tscheduler.MakeStatefulset(testNs, sfsName, tc.replicas), metav1.CreateOptions{})
Expand All @@ -472,7 +474,7 @@ func TestAutoscaler(t *testing.T) {
return tc.reserved
},
}
autoscaler := newAutoscaler(ctx, cfg, stateAccessor)
autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache)
_ = autoscaler.Promote(reconciler.UniversalBucket(), nil)

for _, vpod := range tc.vpods {
Expand Down Expand Up @@ -509,7 +511,8 @@ func TestAutoscalerScaleDownToZero(t *testing.T) {

vpodClient := tscheduler.NewVPodClient()
ls := listers.NewListers(nil)
stateAccessor := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, scheduler.MAXFILLUP, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, nil, ls.GetNodeLister())
scaleCache := scheduler.NewScaleCache(ctx, testNs)
stateAccessor := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, scheduler.MAXFILLUP, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, nil, ls.GetNodeLister(), scaleCache)

sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs)
_, err := sfsClient.Create(ctx, tscheduler.MakeStatefulset(testNs, sfsName, 10), metav1.CreateOptions{})
Expand All @@ -532,7 +535,7 @@ func TestAutoscalerScaleDownToZero(t *testing.T) {
return nil
},
}
autoscaler := newAutoscaler(ctx, cfg, stateAccessor)
autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache)
_ = autoscaler.Promote(reconciler.UniversalBucket(), nil)

done := make(chan bool)
Expand Down Expand Up @@ -946,7 +949,8 @@ func TestCompactor(t *testing.T) {

lsp := listers.NewListers(podlist)
lsn := listers.NewListers(nodelist)
stateAccessor := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister())
scaleCache := scheduler.NewScaleCache(ctx, testNs)
stateAccessor := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache)

evictions := make(map[types.NamespacedName][]duckv1alpha1.Placement)
recordEviction := func(pod *corev1.Pod, vpod scheduler.VPod, from *duckv1alpha1.Placement) error {
Expand All @@ -962,7 +966,7 @@ func TestCompactor(t *testing.T) {
RefreshPeriod: 10 * time.Second,
PodCapacity: 10,
}
autoscaler := newAutoscaler(ctx, cfg, stateAccessor)
autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache)
_ = autoscaler.Promote(reconciler.UniversalBucket(), func(bucket reconciler.Bucket, name types.NamespacedName) {})
assert.Equal(t, true, autoscaler.isLeader.Load())

Expand Down
6 changes: 4 additions & 2 deletions pkg/scheduler/statefulset/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,16 @@ func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) {
podInformer := podinformer.Get(ctx)
podLister := podInformer.Lister().Pods(cfg.StatefulSetNamespace)

stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, podLister, cfg.NodeLister)
scaleCache := scheduler.NewScaleCache(ctx, cfg.StatefulSetNamespace)

stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, podLister, cfg.NodeLister, scaleCache)

var getReserved GetReserved
cfg.getReserved = func() map[types.NamespacedName]map[string]int32 {
return getReserved()
}

autoscaler := newAutoscaler(ctx, cfg, stateAccessor)
autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache)

var wg sync.WaitGroup
wg.Add(1)
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/statefulset/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,8 @@ func TestStatefulsetScheduler(t *testing.T) {
}
lsp := listers.NewListers(podlist)
lsn := listers.NewListers(nodelist)
sa := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister())
scaleCache := scheduler.NewScaleCache(ctx, testNs)
sa := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache)
cfg := &Config{
StatefulSetNamespace: testNs,
StatefulSetName: sfsName,
Expand Down
Loading