Skip to content

Commit

Permalink
Review
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Feb 12, 2024
1 parent 93d4bd1 commit 54733b3
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 33 deletions.
10 changes: 1 addition & 9 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
klog.Errorf("Failed to refresh cloud provider config: %v", err)
return caerrors.ToAutoscalerError(caerrors.CloudProviderError, err)
}
if a.loopStartNotifier != nil {
stop, err := a.loopStartNotifier.Refresh()
if stop {
return caerrors.ToAutoscalerError(caerrors.TransientError, err)
}
if err != nil {
klog.Errorf("Failed to refresh loopstart observers: %v", err)
}
}
a.loopStartNotifier.Refresh()

// Update node groups min/max and maximum number of nodes being set for all node groups after cloud provider refresh
maxNodesCount := 0
Expand Down
11 changes: 11 additions & 0 deletions cluster-autoscaler/core/static_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/callbacks"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
Expand Down Expand Up @@ -281,6 +282,7 @@ func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) {
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}

Expand Down Expand Up @@ -374,6 +376,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
initialized: true,
}
Expand Down Expand Up @@ -573,6 +576,7 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) {
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
initialized: true,
}
Expand Down Expand Up @@ -798,6 +802,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
initialized: true,
}
Expand Down Expand Up @@ -948,6 +953,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}

Expand Down Expand Up @@ -1096,6 +1102,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}

Expand Down Expand Up @@ -1224,6 +1231,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T)
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}

Expand Down Expand Up @@ -1322,6 +1330,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t *
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}

Expand Down Expand Up @@ -1427,6 +1436,7 @@ func TestStaticAutoscalerRunOnceWithUnselectedNodeGroups(t *testing.T) {
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
processors: NewTestProcessors(&context),
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}

Expand Down Expand Up @@ -2023,6 +2033,7 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) {
scaleDownActuator: actuator,
scaleDownPlanner: planner,
processors: NewTestProcessors(&ctx),
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}

Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module k8s.io/autoscaler/cluster-autoscaler

go 1.21
go 1.21.3

require (
cloud.google.com/go/compute/metadata v0.2.3
Expand Down
16 changes: 4 additions & 12 deletions cluster-autoscaler/observers/loopstart/loopstart.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ limitations under the License.

package loopstart

import "errors"

// Observer interface is used to store object that needed to be refreshed in each CA loop.
// It returns error and a bool value whether the loop should be skipped.
type Observer interface {
Refresh() (bool, error)
Refresh()
}

// ObserversList interface is used to store objects that needed to be refreshed in each CA loop.
Expand All @@ -30,19 +28,13 @@ type ObserversList struct {
}

// Refresh refresh observers each CA loop.
func (l *ObserversList) Refresh() (stopLoop bool, err error) {
var resultErr error
func (l *ObserversList) Refresh() {
if l.observers == nil {
return false, nil
return
}
for _, observer := range l.observers {
stopLoop, err := observer.Refresh()
if stopLoop {
return true, nil
}
resultErr = errors.Join(resultErr, err)
observer.Refresh()
}
return false, resultErr
}

// NewObserversList return new ObserversList.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
)

// ProvisioningRequestProcessor process ProvisionignRequests in the cluster.
// ProvisioningRequestProcessor process ProvisioningRequests in the cluster.
type ProvisioningRequestProcessor interface {
Process([]*provreqwrapper.ProvisioningRequest) error
Process([]*provreqwrapper.ProvisioningRequest)
CleanUp()
}

Expand All @@ -34,7 +35,8 @@ type provisioningRequestClient interface {
ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error)
}

// CombinedProvReqProcessor contains ProvisioningClassProcessor for each ProvisioningClass.
// CombinedProvReqProcessor is responsible for processing ProvisioningRequest for each ProvisioningClass
// every CA loop and updating conditions for expired ProvisioningRequests.
type CombinedProvReqProcessor struct {
client provisioningRequestClient
processors []ProvisioningRequestProcessor
Expand All @@ -50,17 +52,15 @@ func NewCombinedProvReqProcessor(kubeConfig *rest.Config, processors []Provision
}

// Refresh iterates over ProvisioningRequests and updates its conditions/state.
func (cp *CombinedProvReqProcessor) Refresh() (bool, error) {
func (cp *CombinedProvReqProcessor) Refresh() {
provReqs, err := cp.client.ProvisioningRequests()
if err != nil {
return true, err
klog.Errorf("Failed to get ProvisioningRequests list, err: %v", err)
return
}
for _, p := range cp.processors {
if err := p.Process(provReqs); err != nil {
return false, err
}
p.Process(provReqs)
}
return false, nil
}

// CleanUp cleans up internal state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewCheckCapacityProcessor() *checkCapacityProcessor {
// -BookingExpired condition for Provisioned ProvisioningRequest if capacity reservation time is expired.
// -Failed condition for ProvisioningRequest that were not provisioned during defaultExpirationTime.
// TODO(yaroslava): fetch reservation and expiration time from ProvisioningRequest
func (p *checkCapacityProcessor) Process(provReqs []*provreqwrapper.ProvisioningRequest) error {
func (p *checkCapacityProcessor) Process(provReqs []*provreqwrapper.ProvisioningRequest) {
expiredProvReq := []*provreqwrapper.ProvisioningRequest{}
failedProvReq := []*provreqwrapper.ProvisioningRequest{}
for _, provReq := range provReqs {
Expand Down Expand Up @@ -85,7 +85,6 @@ func (p *checkCapacityProcessor) Process(provReqs []*provreqwrapper.Provisioning
setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, ExpiredReason, ExpiredMsg, metav1.NewTime(p.now()))
updated++
}
return nil
}

// Cleanup cleans up internal state.
Expand Down

0 comments on commit 54733b3

Please sign in to comment.