Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add proactive scaleup #7145

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ import (
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/podinjection"
podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/emptycandidates"
Expand Down Expand Up @@ -266,6 +269,8 @@ var (
provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.")
frequentLoopsEnabled = flag.Bool("frequent-loops-enabled", false, "Whether clusterautoscaler triggers new iterations more frequently when it's needed")
asyncNodeGroupsEnabled = flag.Bool("async-node-groups", false, "Whether clusterautoscaler creates and deletes node groups asynchronously. Experimental: requires cloud provider supporting async node group operations, enable at your own risk.")
proactiveScaleupEnabled = flag.Bool("enable-proactive-scaleup", false, "Whether to enable/disable proactive scale-ups, defaults to false")
podInjectionLimit = flag.Int("pod-injection-limit", 5000, "Limits total number of pods while injecting fake pods. If unschedulable pods already exceeds the limit, pod injection is disabled but pods are not truncated.")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -527,6 +532,20 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
podListProcessor.AddProcessor(injector)
podListProcessor.AddProcessor(provreqProcesor)
}

if *proactiveScaleupEnabled {
podInjectionBackoffRegistry := podinjectionbackoff.NewFakePodControllerRegistry()

podInjectionPodListProcessor := podinjection.NewPodInjectionPodListProcessor(podInjectionBackoffRegistry)
enforceInjectedPodsLimitProcessor := podinjection.NewEnforceInjectedPodsLimitProcessor(*podInjectionLimit)

podListProcessor = pods.NewCombinedPodListProcessor([]pods.PodListProcessor{podInjectionPodListProcessor, podListProcessor, enforceInjectedPodsLimitProcessor})

// FakePodsScaleUpStatusProcessor processor needs to be the first processor in ScaleUpStatusProcessor as it filters out fake pods from
// Scale Up status so that we don't emit events.
opts.Processors.ScaleUpStatusProcessor = podinjection.NewFakePodsScaleUpStatusProcessor(podInjectionBackoffRegistry)
}

opts.Processors.PodListProcessor = podListProcessor
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}
if autoscalingOptions.ParallelDrain {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podinjectionbackoff

import (
"time"

"github.com/cenkalti/backoff/v4"
"k8s.io/apimachinery/pkg/types"
)

const (
baseBackoff = 5 * time.Minute
backoffThreshold = 30 * time.Minute
)

// controllerEntry describes a backed off controller
type controllerEntry struct {
until time.Time
backoff backoff.ExponentialBackOff
}

// ControllerRegistry contains backed off controllers to be used in time-based backing off of controllers considered in fake pod injection
type ControllerRegistry struct {
backedOffControllers map[types.UID]controllerEntry
}

// NewFakePodControllerRegistry Creates & returns an instance of fakePodControllerBackoffRegistry
func NewFakePodControllerRegistry() *ControllerRegistry {
return &ControllerRegistry{
backedOffControllers: make(map[types.UID]controllerEntry),
}
}

// newExponentialBackOff creates an instance of ExponentialBackOff using non-default values.
func newExponentialBackOff(clock backoff.Clock) backoff.ExponentialBackOff {
b := backoff.ExponentialBackOff{
InitialInterval: baseBackoff,
// Disables randomization for easier testing and better predictability
RandomizationFactor: 0,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: backoffThreshold,
// Disable stopping if it reaches threshold
MaxElapsedTime: 0,
Stop: backoff.Stop,
Clock: clock,
}
b.Reset()
return b
}

// BackoffController Backs off a controller
// If the controller is already in backoff it's backoff time is exponentially increased
// If the controller was in backoff, it resets its entry and makes it in backoff
// If the controller is not in backoff and not stored, a new entry is created
func (r *ControllerRegistry) BackoffController(ownerUID types.UID, now time.Time) {
if ownerUID == "" {
return
}

controller, found := r.backedOffControllers[ownerUID]

if !found || now.After(controller.until) {
controller = controllerEntry{
backoff: newExponentialBackOff(backoff.SystemClock),
}
}
// NextBackOff() needs to be called to increase the next interval
controller.until = now.Add(controller.backoff.NextBackOff())

r.backedOffControllers[ownerUID] = controller
}

// BackOffUntil Returns the back off status a controller with id `uid`
func (r *ControllerRegistry) BackOffUntil(uid types.UID, now time.Time) time.Time {
controller, found := r.backedOffControllers[uid]

if !found {
return time.Time{}
}

return controller.until
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podinjectionbackoff

import (
"testing"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/types"
)

func TestBackoffControllerOfPod(t *testing.T) {
c1 := types.UID("c1")
c2 := types.UID("c2")
clock := &clock{}

testCases := map[string]struct {
backoffCounts map[types.UID]int
spendTime time.Duration
expectedBackedoffControllers map[types.UID]controllerEntry
}{
"backing-off a controller adds its controller UID in backoff correctly": {
backoffCounts: map[types.UID]int{
c1: 1,
},
expectedBackedoffControllers: map[types.UID]controllerEntry{
c1: {
until: clock.now.Add(baseBackoff),
},
},
},
"backing-off an already backed-off controller exponentially increases backoff duration": {
backoffCounts: map[types.UID]int{
c1: 2,
},
expectedBackedoffControllers: map[types.UID]controllerEntry{
c1: {
until: clock.now.Add(time.Duration(float64(baseBackoff) * backoff.DefaultMultiplier)),
},
},
},
"backing-off a controller doesn't affect other controllers": {
backoffCounts: map[types.UID]int{
c1: 1,
c2: 2,
},
expectedBackedoffControllers: map[types.UID]controllerEntry{
c1: {
until: clock.now.Add(baseBackoff),
},
c2: {
until: clock.now.Add(time.Duration(float64(baseBackoff) * backoff.DefaultMultiplier)),
},
},
},
"backing-off a past backed-off controller resets backoff": {
backoffCounts: map[types.UID]int{
c1: 1,
},
spendTime: baseBackoff * 2,
expectedBackedoffControllers: map[types.UID]controllerEntry{
c1: {
until: clock.now.Add(baseBackoff * 2).Add(baseBackoff),
},
},
},
"back-off duration doesn't exceed backoffThreshold": {
backoffCounts: map[types.UID]int{
c1: 15,
},
expectedBackedoffControllers: map[types.UID]controllerEntry{
c1: {
until: clock.now.Add(backoffThreshold),
},
},
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
// Reset time between test cases
clock.now = time.Time{}
clock.now = clock.now.Add(tc.spendTime)

registry := NewFakePodControllerRegistry()

for uid, backoffCount := range tc.backoffCounts {
for i := 0; i < backoffCount; i++ {
registry.BackoffController(uid, clock.now)
}
}

assert.Equal(t, len(registry.backedOffControllers), len(tc.expectedBackedoffControllers))
for uid, backoffController := range tc.expectedBackedoffControllers {
assert.NotNil(t, registry.backedOffControllers[uid])
assert.Equal(t, backoffController.until, registry.backedOffControllers[uid].until)
}
})
}
}

type clock struct {
now time.Time
}

func (c *clock) Now() time.Time {
return c.now
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podinjection

import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
)

// EnforceInjectedPodsLimitProcessor is a PodListProcessor used to limit the number of injected fake pods.
type EnforceInjectedPodsLimitProcessor struct {
podLimit int
}

// NewEnforceInjectedPodsLimitProcessor return an instance of EnforceInjectedPodsLimitProcessor
func NewEnforceInjectedPodsLimitProcessor(podLimit int) *EnforceInjectedPodsLimitProcessor {
return &EnforceInjectedPodsLimitProcessor{
podLimit: podLimit,
}
}

// Process filters unschedulablePods and enforces the limit of the number of injected pods
func (p *EnforceInjectedPodsLimitProcessor) Process(ctx *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {

numberOfFakePodsToRemove := len(unschedulablePods) - p.podLimit
var unschedulablePodsAfterProcessing []*apiv1.Pod

for _, pod := range unschedulablePods {
if IsFake(pod) && numberOfFakePodsToRemove > 0 {
numberOfFakePodsToRemove -= 1
continue
}

unschedulablePodsAfterProcessing = append(unschedulablePodsAfterProcessing, pod)
}

return unschedulablePodsAfterProcessing, nil
}

// CleanUp is called at CA termination
func (p *EnforceInjectedPodsLimitProcessor) CleanUp() {
}
Loading
Loading