Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache statefulset scale update/get requests #7651

Merged
merged 4 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@ limitations under the License.
package scheduler

import (
"context"
"sync"
"time"

autoscalingv1 "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/cache"

duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
)
Expand Down Expand Up @@ -114,3 +121,94 @@ type VPod interface {

GetResourceVersion() string
}

type ScaleClient interface {
GetScale(ctx context.Context, name string, options metav1.GetOptions) (*autoscalingv1.Scale, error)
UpdateScale(ctx context.Context, name string, scale *autoscalingv1.Scale, options metav1.UpdateOptions) (*autoscalingv1.Scale, error)
}

type ScaleCacheConfig struct {
RefreshPeriod time.Duration `json:"refreshPeriod"`
}

type ScaleCache struct {
entriesMu sync.RWMutex // protects access to entries, entries itself is concurrency safe, so we only need to ensure that we correctly access the pointer
entries *cache.Expiring
scaleClient ScaleClient
statefulSetNamespace string
config ScaleCacheConfig
}

type scaleEntry struct {
specReplicas int32
statReplicas int32
}

func NewScaleCache(ctx context.Context, namespace string, scaleClient ScaleClient, config ScaleCacheConfig) *ScaleCache {
return &ScaleCache{
entries: cache.NewExpiring(),
scaleClient: scaleClient,
statefulSetNamespace: namespace,
config: config,
}
}

func (sc *ScaleCache) GetScale(ctx context.Context, statefulSetName string, options metav1.GetOptions) (*autoscalingv1.Scale, error) {
sc.entriesMu.RLock()
defer sc.entriesMu.RUnlock()

if entry, ok := sc.entries.Get(statefulSetName); ok {
entry := entry.(scaleEntry)
return &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: statefulSetName,
Namespace: sc.statefulSetNamespace,
},
Spec: autoscalingv1.ScaleSpec{
Replicas: entry.specReplicas,
},
Status: autoscalingv1.ScaleStatus{
Replicas: entry.statReplicas,
},
}, nil
}

scale, err := sc.scaleClient.GetScale(ctx, statefulSetName, options)
if err != nil {
return scale, err
}

sc.setScale(statefulSetName, scale)

return scale, nil
Cali0707 marked this conversation as resolved.
Show resolved Hide resolved
}

func (sc *ScaleCache) UpdateScale(ctx context.Context, statefulSetName string, scale *autoscalingv1.Scale, opts metav1.UpdateOptions) (*autoscalingv1.Scale, error) {
sc.entriesMu.RLock()
defer sc.entriesMu.RUnlock()

updatedScale, err := sc.scaleClient.UpdateScale(ctx, statefulSetName, scale, opts)
if err != nil {
return updatedScale, err
}

sc.setScale(statefulSetName, updatedScale)

return updatedScale, nil
Cali0707 marked this conversation as resolved.
Show resolved Hide resolved
}

func (sc *ScaleCache) Reset() {
sc.entriesMu.Lock()
defer sc.entriesMu.Unlock()

sc.entries = cache.NewExpiring()
}

func (sc *ScaleCache) setScale(name string, scale *autoscalingv1.Scale) {
entry := scaleEntry{
specReplicas: scale.Spec.Replicas,
statReplicas: scale.Status.Replicas,
}

sc.entries.Set(name, entry, sc.config.RefreshPeriod)
}
50 changes: 24 additions & 26 deletions pkg/scheduler/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
corev1 "k8s.io/client-go/listers/core/v1"

kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/scheduler"
Expand Down Expand Up @@ -154,34 +152,34 @@ func (s *State) IsSchedulablePod(ordinal int32) bool {

// stateBuilder reconstruct the state from scratch, by listing vpods
type stateBuilder struct {
ctx context.Context
logger *zap.SugaredLogger
vpodLister scheduler.VPodLister
capacity int32
schedulerPolicy scheduler.SchedulerPolicyType
nodeLister corev1.NodeLister
statefulSetClient clientappsv1.StatefulSetInterface
statefulSetName string
podLister corev1.PodNamespaceLister
schedPolicy *scheduler.SchedulerPolicy
deschedPolicy *scheduler.SchedulerPolicy
ctx context.Context
logger *zap.SugaredLogger
vpodLister scheduler.VPodLister
capacity int32
schedulerPolicy scheduler.SchedulerPolicyType
nodeLister corev1.NodeLister
statefulSetCache *scheduler.ScaleCache
statefulSetName string
podLister corev1.PodNamespaceLister
schedPolicy *scheduler.SchedulerPolicy
deschedPolicy *scheduler.SchedulerPolicy
}

// NewStateBuilder returns a StateAccessor recreating the state from scratch each time it is requested
func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy *scheduler.SchedulerPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister) StateAccessor {
func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy *scheduler.SchedulerPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor {

return &stateBuilder{
ctx: ctx,
logger: logging.FromContext(ctx),
vpodLister: lister,
capacity: podCapacity,
schedulerPolicy: schedulerPolicy,
nodeLister: nodeLister,
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(namespace),
statefulSetName: sfsname,
podLister: podlister,
schedPolicy: schedPolicy,
deschedPolicy: deschedPolicy,
ctx: ctx,
logger: logging.FromContext(ctx),
vpodLister: lister,
capacity: podCapacity,
schedulerPolicy: schedulerPolicy,
nodeLister: nodeLister,
statefulSetCache: statefulSetCache,
statefulSetName: sfsname,
podLister: podlister,
schedPolicy: schedPolicy,
deschedPolicy: deschedPolicy,
}
}

Expand All @@ -191,7 +189,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
return nil, err
}

scale, err := s.statefulSetClient.GetScale(s.ctx, s.statefulSetName, metav1.GetOptions{})
scale, err := s.statefulSetCache.GetScale(s.ctx, s.statefulSetName, metav1.GetOptions{})
if err != nil {
s.logger.Infow("failed to get statefulset", zap.Error(err))
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion pkg/scheduler/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand Down Expand Up @@ -642,7 +643,9 @@ func TestStateBuilder(t *testing.T) {
lsp := listers.NewListers(podlist)
lsn := listers.NewListers(nodelist)

stateBuilder := NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, int32(10), tc.schedulerPolicyType, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister())
scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5})

stateBuilder := NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, int32(10), tc.schedulerPolicyType, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache)
state, err := stateBuilder.State(tc.reserved)
if err != nil {
t.Fatal("unexpected error", err)
Expand Down
48 changes: 24 additions & 24 deletions pkg/scheduler/statefulset/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
"knative.dev/pkg/reconciler"

kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/scheduler"
Expand All @@ -57,13 +55,13 @@ type Autoscaler interface {
}

type autoscaler struct {
statefulSetClient clientappsv1.StatefulSetInterface
statefulSetName string
vpodLister scheduler.VPodLister
logger *zap.SugaredLogger
stateAccessor st.StateAccessor
trigger chan struct{}
evictor scheduler.Evictor
statefulSetCache *scheduler.ScaleCache
statefulSetName string
vpodLister scheduler.VPodLister
logger *zap.SugaredLogger
stateAccessor st.StateAccessor
trigger chan struct{}
evictor scheduler.Evictor

// capacity is the total number of virtual replicas available per pod.
capacity int32
Expand Down Expand Up @@ -92,6 +90,8 @@ func (a *autoscaler) Promote(b reconciler.Bucket, _ func(reconciler.Bucket, type
if b.Has(ephemeralLeaderElectionObject) {
// The promoted bucket has the ephemeralLeaderElectionObject, so we are leader.
a.isLeader.Store(true)
// reset the cache to be empty so that when we access state as the leader it is always the newest values
a.statefulSetCache.Reset()
}
return nil
}
Expand All @@ -104,20 +104,20 @@ func (a *autoscaler) Demote(b reconciler.Bucket) {
}
}

func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor) *autoscaler {
func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor, statefulSetCache *scheduler.ScaleCache) *autoscaler {
return &autoscaler{
logger: logging.FromContext(ctx).With(zap.String("component", "autoscaler")),
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace),
statefulSetName: cfg.StatefulSetName,
vpodLister: cfg.VPodLister,
stateAccessor: stateAccessor,
evictor: cfg.Evictor,
trigger: make(chan struct{}, 1),
capacity: cfg.PodCapacity,
refreshPeriod: cfg.RefreshPeriod,
lock: new(sync.Mutex),
isLeader: atomic.Bool{},
getReserved: cfg.getReserved,
logger: logging.FromContext(ctx).With(zap.String("component", "autoscaler")),
statefulSetCache: statefulSetCache,
statefulSetName: cfg.StatefulSetName,
vpodLister: cfg.VPodLister,
stateAccessor: stateAccessor,
evictor: cfg.Evictor,
trigger: make(chan struct{}, 1),
capacity: cfg.PodCapacity,
refreshPeriod: cfg.RefreshPeriod,
lock: new(sync.Mutex),
isLeader: atomic.Bool{},
getReserved: cfg.getReserved,
// Anything that is less than now() - refreshPeriod, so that we will try to compact
// as soon as we start.
lastCompactAttempt: time.Now().
Expand Down Expand Up @@ -183,7 +183,7 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err
return err
}

scale, err := a.statefulSetClient.GetScale(ctx, a.statefulSetName, metav1.GetOptions{})
scale, err := a.statefulSetCache.GetScale(ctx, a.statefulSetName, metav1.GetOptions{})
if err != nil {
// skip a beat
a.logger.Infow("failed to get scale subresource", zap.Error(err))
Expand Down Expand Up @@ -236,7 +236,7 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err
scale.Spec.Replicas = newreplicas
a.logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas))

_, err = a.statefulSetClient.UpdateScale(ctx, a.statefulSetName, scale, metav1.UpdateOptions{})
_, err = a.statefulSetCache.UpdateScale(ctx, a.statefulSetName, scale, metav1.UpdateOptions{})
if err != nil {
a.logger.Errorw("updating scale subresource failed", zap.Error(err))
return err
Expand Down
16 changes: 10 additions & 6 deletions pkg/scheduler/statefulset/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,9 @@ func TestAutoscaler(t *testing.T) {
lsnn = lsn.GetNodeLister()
}

stateAccessor := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lspp, lsnn)
scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5})

stateAccessor := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lspp, lsnn, scaleCache)

sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs)
_, err := sfsClient.Create(ctx, tscheduler.MakeStatefulset(testNs, sfsName, tc.replicas), metav1.CreateOptions{})
Expand All @@ -472,7 +474,7 @@ func TestAutoscaler(t *testing.T) {
return tc.reserved
},
}
autoscaler := newAutoscaler(ctx, cfg, stateAccessor)
autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache)
_ = autoscaler.Promote(reconciler.UniversalBucket(), nil)

for _, vpod := range tc.vpods {
Expand Down Expand Up @@ -509,7 +511,8 @@ func TestAutoscalerScaleDownToZero(t *testing.T) {

vpodClient := tscheduler.NewVPodClient()
ls := listers.NewListers(nil)
stateAccessor := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, scheduler.MAXFILLUP, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, nil, ls.GetNodeLister())
scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5})
stateAccessor := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, scheduler.MAXFILLUP, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, nil, ls.GetNodeLister(), scaleCache)

sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs)
_, err := sfsClient.Create(ctx, tscheduler.MakeStatefulset(testNs, sfsName, 10), metav1.CreateOptions{})
Expand All @@ -532,7 +535,7 @@ func TestAutoscalerScaleDownToZero(t *testing.T) {
return nil
},
}
autoscaler := newAutoscaler(ctx, cfg, stateAccessor)
autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache)
_ = autoscaler.Promote(reconciler.UniversalBucket(), nil)

done := make(chan bool)
Expand Down Expand Up @@ -946,7 +949,8 @@ func TestCompactor(t *testing.T) {

lsp := listers.NewListers(podlist)
lsn := listers.NewListers(nodelist)
stateAccessor := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister())
scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5})
stateAccessor := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache)

evictions := make(map[types.NamespacedName][]duckv1alpha1.Placement)
recordEviction := func(pod *corev1.Pod, vpod scheduler.VPod, from *duckv1alpha1.Placement) error {
Expand All @@ -962,7 +966,7 @@ func TestCompactor(t *testing.T) {
RefreshPeriod: 10 * time.Second,
PodCapacity: 10,
}
autoscaler := newAutoscaler(ctx, cfg, stateAccessor)
autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache)
_ = autoscaler.Promote(reconciler.UniversalBucket(), func(bucket reconciler.Bucket, name types.NamespacedName) {})
assert.Equal(t, true, autoscaler.isLeader.Load())

Expand Down
7 changes: 5 additions & 2 deletions pkg/scheduler/statefulset/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Config struct {
StatefulSetNamespace string `json:"statefulSetNamespace"`
StatefulSetName string `json:"statefulSetName"`

ScaleCacheConfig scheduler.ScaleCacheConfig `json:"scaleCacheConfig"`
// PodCapacity max capacity for each StatefulSet's pod.
PodCapacity int32 `json:"podCapacity"`
// Autoscaler refresh period
Expand All @@ -87,14 +88,16 @@ func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) {
podInformer := podinformer.Get(ctx)
podLister := podInformer.Lister().Pods(cfg.StatefulSetNamespace)

stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, podLister, cfg.NodeLister)
scaleCache := scheduler.NewScaleCache(ctx, cfg.StatefulSetNamespace, kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), cfg.ScaleCacheConfig)

stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, podLister, cfg.NodeLister, scaleCache)

var getReserved GetReserved
cfg.getReserved = func() map[types.NamespacedName]map[string]int32 {
return getReserved()
}

autoscaler := newAutoscaler(ctx, cfg, stateAccessor)
autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache)

var wg sync.WaitGroup
wg.Add(1)
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/statefulset/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,8 @@ func TestStatefulsetScheduler(t *testing.T) {
}
lsp := listers.NewListers(podlist)
lsn := listers.NewListers(nodelist)
sa := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister())
scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5})
sa := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache)
cfg := &Config{
StatefulSetNamespace: testNs,
StatefulSetName: sfsName,
Expand Down
Loading