From f31042d4d6af602c1244265c134cf2059c40305e Mon Sep 17 00:00:00 2001 From: Kubernetes Prow Robot Date: Thu, 3 Mar 2022 13:01:17 -0800 Subject: [PATCH 1/6] Merge pull request #4489 from airbnb/drmorr--early-abort-if-aws-node-group-no-capacity Early abort if AWS node group has no capacity --- .../cloudprovider/aws/CA_with_AWS_IAM_OIDC.md | 1 + .../cloudprovider/aws/README.md | 1 + .../cloudprovider/aws/auto_scaling_groups.go | 134 ++++++++++++------ .../aws/auto_scaling_groups_test.go | 120 ++++++++++++++++ .../cloudprovider/aws/aws_cloud_provider.go | 27 +++- .../aws/aws_cloud_provider_test.go | 10 +- .../cloudprovider/aws/aws_manager.go | 7 +- .../cloudprovider/aws/aws_manager_test.go | 18 ++- .../cloudprovider/aws/aws_wrapper.go | 1 + .../cloudprovider/aws/aws_wrapper_test.go | 5 + .../cloudprovider/aws/instance_type_cache.go | 2 +- .../aws/instance_type_cache_test.go | 7 +- 12 files changed, 277 insertions(+), 56 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md b/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md index 217bd3d505eb..53db71aa6cc5 100644 --- a/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md +++ b/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md @@ -65,6 +65,7 @@ Note: The keys for the tags that you entered don't have values. Cluster Autoscal "autoscaling:DescribeAutoScalingGroups", "autoscaling:DescribeAutoScalingInstances", "autoscaling:DescribeLaunchConfigurations", + "autoscaling:DescribeScalingActivities", "autoscaling:DescribeTags", "autoscaling:SetDesiredCapacity", "autoscaling:TerminateInstanceInAutoScalingGroup" diff --git a/cluster-autoscaler/cloudprovider/aws/README.md b/cluster-autoscaler/cloudprovider/aws/README.md index ba71b8b67830..603abf1f8803 100644 --- a/cluster-autoscaler/cloudprovider/aws/README.md +++ b/cluster-autoscaler/cloudprovider/aws/README.md @@ -31,6 +31,7 @@ The following policy provides the minimum privileges necessary for Cluster Autos "autoscaling:DescribeAutoScalingGroups", "autoscaling:DescribeAutoScalingInstances", "autoscaling:DescribeLaunchConfigurations", + "autoscaling:DescribeScalingActivities", "autoscaling:SetDesiredCapacity", "autoscaling:TerminateInstanceInAutoScalingGroup", "ec2:DescribeInstanceTypes" diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index 1f85b0906780..f15c778d9eee 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -31,14 +31,16 @@ import ( ) const ( - scaleToZeroSupported = true - placeholderInstanceNamePrefix = "i-placeholder" + scaleToZeroSupported = true + placeholderInstanceNamePrefix = "i-placeholder" + placeholderUnfulfillableStatus = "placeholder-cannot-be-fulfilled" ) type asgCache struct { - registeredAsgs []*asg + registeredAsgs map[AwsRef]*asg asgToInstances map[AwsRef][]AwsInstanceRef instanceToAsg map[AwsInstanceRef]*asg + instanceStatus map[AwsInstanceRef]*string asgInstanceTypeCache *instanceTypeExpirationStore mutex sync.Mutex awsService *awsWrapper @@ -62,9 +64,10 @@ type mixedInstancesPolicy struct { type asg struct { AwsRef - minSize int - maxSize int - curSize int + minSize int + maxSize int + curSize int + lastUpdateTime time.Time AvailabilityZones []string LaunchConfigurationName string @@ -75,10 +78,11 @@ type asg struct { func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySpecs []asgAutoDiscoveryConfig) (*asgCache, error) { registry := &asgCache{ - registeredAsgs: make([]*asg, 0), + registeredAsgs: make(map[AwsRef]*asg, 0), awsService: awsService, asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), + instanceStatus: make(map[AwsInstanceRef]*string), asgInstanceTypeCache: newAsgInstanceTypeCache(awsService), interrupt: make(chan struct{}), asgAutoDiscoverySpecs: autoDiscoverySpecs, @@ -121,53 +125,44 @@ func (m *asgCache) parseExplicitAsgs(specs []string) error { // Register ASG. Returns the registered ASG. func (m *asgCache) register(asg *asg) *asg { - for i := range m.registeredAsgs { - if existing := m.registeredAsgs[i]; existing.AwsRef == asg.AwsRef { - if reflect.DeepEqual(existing, asg) { - return existing - } + if existing, asgExists := m.registeredAsgs[asg.AwsRef]; asgExists { + if reflect.DeepEqual(existing, asg) { + return existing + } - klog.V(4).Infof("Updating ASG %s", asg.AwsRef.Name) + klog.V(4).Infof("Updating ASG %s", asg.AwsRef.Name) - // Explicit registered groups should always use the manually provided min/max - // values and the not the ones returned by the API - if !m.explicitlyConfigured[asg.AwsRef] { - existing.minSize = asg.minSize - existing.maxSize = asg.maxSize - } + // Explicit registered groups should always use the manually provided min/max + // values and the not the ones returned by the API + if !m.explicitlyConfigured[asg.AwsRef] { + existing.minSize = asg.minSize + existing.maxSize = asg.maxSize + } - existing.curSize = asg.curSize + existing.curSize = asg.curSize - // Those information are mainly required to create templates when scaling - // from zero - existing.AvailabilityZones = asg.AvailabilityZones - existing.LaunchConfigurationName = asg.LaunchConfigurationName - existing.LaunchTemplate = asg.LaunchTemplate - existing.MixedInstancesPolicy = asg.MixedInstancesPolicy - existing.Tags = asg.Tags + // Those information are mainly required to create templates when scaling + // from zero + existing.AvailabilityZones = asg.AvailabilityZones + existing.LaunchConfigurationName = asg.LaunchConfigurationName + existing.LaunchTemplate = asg.LaunchTemplate + existing.MixedInstancesPolicy = asg.MixedInstancesPolicy + existing.Tags = asg.Tags - return existing - } + return existing } klog.V(1).Infof("Registering ASG %s", asg.AwsRef.Name) - m.registeredAsgs = append(m.registeredAsgs, asg) + m.registeredAsgs[asg.AwsRef] = asg return asg } // Unregister ASG. Returns the unregistered ASG. func (m *asgCache) unregister(a *asg) *asg { - updated := make([]*asg, 0, len(m.registeredAsgs)) - var changed *asg - for _, existing := range m.registeredAsgs { - if existing.AwsRef == a.AwsRef { - klog.V(1).Infof("Unregistered ASG %s", a.AwsRef.Name) - changed = a - continue - } - updated = append(updated, existing) + if _, asgExists := m.registeredAsgs[a.AwsRef]; asgExists { + klog.V(1).Infof("Unregistered ASG %s", a.AwsRef.Name) + delete(m.registeredAsgs, a.AwsRef) } - m.registeredAsgs = updated - return changed + return a } func (m *asgCache) buildAsgFromSpec(spec string) (*asg, error) { @@ -184,7 +179,7 @@ func (m *asgCache) buildAsgFromSpec(spec string) (*asg, error) { } // Get returns the currently registered ASGs -func (m *asgCache) Get() []*asg { +func (m *asgCache) Get() map[AwsRef]*asg { m.mutex.Lock() defer m.mutex.Unlock() @@ -226,6 +221,17 @@ func (m *asgCache) InstancesByAsg(ref AwsRef) ([]AwsInstanceRef, error) { return nil, fmt.Errorf("error while looking for instances of ASG: %s", ref) } +func (m *asgCache) InstanceStatus(ref AwsInstanceRef) (*string, error) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if status, found := m.instanceStatus[ref]; found { + return status, nil + } + + return nil, fmt.Errorf("could not find instance %v", ref) +} + func (m *asgCache) SetAsgSize(asg *asg, size int) error { m.mutex.Lock() defer m.mutex.Unlock() @@ -248,6 +254,7 @@ func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error { } // Proactively set the ASG size so autoscaler makes better decisions + asg.lastUpdateTime = start asg.curSize = size return nil @@ -367,6 +374,7 @@ func (m *asgCache) regenerate() error { newInstanceToAsgCache := make(map[AwsInstanceRef]*asg) newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef) + newInstanceStatusMap := make(map[AwsInstanceRef]*string) // Build list of known ASG names refreshNames, err := m.buildAsgNames() @@ -403,6 +411,7 @@ func (m *asgCache) regenerate() error { ref := m.buildInstanceRefFromAWS(instance) newInstanceToAsgCache[ref] = asg newAsgToInstancesCache[asg.AwsRef][i] = ref + newInstanceStatusMap[ref] = instance.HealthStatus } } @@ -431,6 +440,7 @@ func (m *asgCache) regenerate() error { m.asgToInstances = newAsgToInstancesCache m.instanceToAsg = newInstanceToAsgCache m.autoscalingOptions = newAutoscalingOptions + m.instanceStatus = newInstanceStatusMap return nil } @@ -444,17 +454,57 @@ func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*aut klog.V(4).Infof("Instance group %s has only %d instances created while requested count is %d. "+ "Creating placeholder instances.", *g.AutoScalingGroupName, realInstances, desired) + + healthStatus := "" + isAvailable, err := m.isNodeGroupAvailable(g) + if err != nil { + klog.V(4).Infof("Could not check instance availability, creating placeholder node anyways: %v", err) + } else if !isAvailable { + klog.Warningf("Instance group %s cannot provision any more nodes!", *g.AutoScalingGroupName) + healthStatus = placeholderUnfulfillableStatus + } + for i := realInstances; i < desired; i++ { id := fmt.Sprintf("%s-%s-%d", placeholderInstanceNamePrefix, *g.AutoScalingGroupName, i) g.Instances = append(g.Instances, &autoscaling.Instance{ InstanceId: &id, AvailabilityZone: g.AvailabilityZones[0], + HealthStatus: &healthStatus, }) } } return groups } +func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error) { + input := &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: group.AutoScalingGroupName, + } + + start := time.Now() + response, err := m.awsService.DescribeScalingActivities(input) + observeAWSRequest("DescribeScalingActivities", err, start) + if err != nil { + return true, err // If we can't describe the scaling activities we assume the node group is available + } + + for _, activity := range response.Activities { + asgRef := AwsRef{Name: *group.AutoScalingGroupName} + if a, ok := m.registeredAsgs[asgRef]; ok { + lut := a.lastUpdateTime + if activity.StartTime.Before(lut) { + break + } else if *activity.StatusCode == "Failed" { + klog.Warningf("ASG %s scaling failed with %s", asgRef.Name, *activity) + return false, nil + } + } else { + klog.V(4).Infof("asg %v is not registered yet, skipping DescribeScalingActivities check", asgRef.Name) + } + } + return true, nil +} + func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) { spec := dynamic.NodeGroupSpec{ Name: aws.StringValue(g.AutoScalingGroupName), diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go index e9f9ad1ab79e..c5078b03b5b7 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go @@ -17,8 +17,12 @@ limitations under the License. package aws import ( + "errors" "testing" + "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/autoscaling" "github.com/stretchr/testify/assert" ) @@ -46,3 +50,119 @@ func validateAsg(t *testing.T, asg *asg, name string, minSize int, maxSize int) assert.Equal(t, minSize, asg.minSize) assert.Equal(t, maxSize, asg.maxSize) } + +func TestCreatePlaceholders(t *testing.T) { + registeredAsgName := aws.String("test-asg") + registeredAsgRef := AwsRef{Name: *registeredAsgName} + + cases := []struct { + name string + desiredCapacity *int64 + activities []*autoscaling.Activity + groupLastUpdateTime time.Time + describeErr error + asgToCheck *string + }{ + { + name: "add placeholders successful", + desiredCapacity: aws.Int64(10), + }, + { + name: "no placeholders needed", + desiredCapacity: aws.Int64(0), + }, + { + name: "DescribeScalingActivities failed", + desiredCapacity: aws.Int64(1), + describeErr: errors.New("timeout"), + }, + { + name: "early abort if AWS scaling up fails", + desiredCapacity: aws.Int64(1), + activities: []*autoscaling.Activity{ + { + StatusCode: aws.String("Failed"), + StartTime: aws.Time(time.Unix(10, 0)), + }, + }, + groupLastUpdateTime: time.Unix(9, 0), + }, + { + name: "AWS scaling failed event before CA scale_up", + desiredCapacity: aws.Int64(1), + activities: []*autoscaling.Activity{ + { + StatusCode: aws.String("Failed"), + StartTime: aws.Time(time.Unix(9, 0)), + }, + }, + groupLastUpdateTime: time.Unix(10, 0), + }, + { + name: "asg not registered", + desiredCapacity: aws.Int64(10), + activities: []*autoscaling.Activity{ + { + StatusCode: aws.String("Failed"), + StartTime: aws.Time(time.Unix(10, 0)), + }, + }, + groupLastUpdateTime: time.Unix(9, 0), + asgToCheck: aws.String("unregisteredAsgName"), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + shouldCallDescribeScalingActivities := true + if *tc.desiredCapacity == int64(0) { + shouldCallDescribeScalingActivities = false + } + + asgName := registeredAsgName + if tc.asgToCheck != nil { + asgName = tc.asgToCheck + } + + a := &autoScalingMock{} + if shouldCallDescribeScalingActivities { + a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: asgName, + }).Return( + &autoscaling.DescribeScalingActivitiesOutput{Activities: tc.activities}, + tc.describeErr, + ).Once() + } + + asgCache := &asgCache{ + awsService: &awsWrapper{ + autoScalingI: a, + ec2I: nil, + }, + registeredAsgs: map[AwsRef]*asg{ + registeredAsgRef: { + AwsRef: registeredAsgRef, + lastUpdateTime: tc.groupLastUpdateTime, + }, + }, + } + + groups := []*autoscaling.Group{ + { + AutoScalingGroupName: asgName, + AvailabilityZones: []*string{aws.String("westeros-1a")}, + DesiredCapacity: tc.desiredCapacity, + Instances: []*autoscaling.Instance{}, + }, + } + asgCache.createPlaceholdersForDesiredNonStartedInstances(groups) + assert.Equal(t, int64(len(groups[0].Instances)), *tc.desiredCapacity) + if tc.activities != nil && *tc.activities[0].StatusCode == "Failed" && tc.activities[0].StartTime.After(tc.groupLastUpdateTime) && asgName == registeredAsgName { + assert.Equal(t, *groups[0].Instances[0].HealthStatus, placeholderUnfulfillableStatus) + } else if len(groups[0].Instances) > 0 { + assert.Equal(t, *groups[0].Instances[0].HealthStatus, "") + } + a.AssertExpectations(t) + }) + } +} diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go index 4b59da99909b..9351b2f8da57 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go @@ -86,12 +86,12 @@ func (aws *awsCloudProvider) GetAvailableGPUTypes() map[string]struct{} { // NodeGroups returns all node groups configured for this cloud provider. func (aws *awsCloudProvider) NodeGroups() []cloudprovider.NodeGroup { asgs := aws.awsManager.getAsgs() - ngs := make([]cloudprovider.NodeGroup, len(asgs)) - for i, asg := range asgs { - ngs[i] = &AwsNodeGroup{ + ngs := make([]cloudprovider.NodeGroup, 0, len(asgs)) + for _, asg := range asgs { + ngs = append(ngs, &AwsNodeGroup{ asg: asg, awsManager: aws.awsManager, - } + }) } return ngs @@ -320,7 +320,24 @@ func (ng *AwsNodeGroup) Nodes() ([]cloudprovider.Instance, error) { instances := make([]cloudprovider.Instance, len(asgNodes)) for i, asgNode := range asgNodes { - instances[i] = cloudprovider.Instance{Id: asgNode.ProviderID} + var status *cloudprovider.InstanceStatus + instanceStatusString, err := ng.awsManager.GetInstanceStatus(asgNode) + if err != nil { + klog.V(4).Infof("Could not get instance status, continuing anyways: %v", err) + } else if instanceStatusString != nil && *instanceStatusString == placeholderUnfulfillableStatus { + status = &cloudprovider.InstanceStatus{ + State: cloudprovider.InstanceCreating, + ErrorInfo: &cloudprovider.InstanceErrorInfo{ + ErrorClass: cloudprovider.OutOfResourcesErrorClass, + ErrorCode: placeholderUnfulfillableStatus, + ErrorMessage: "AWS cannot provision any more instances for this node group", + }, + } + } + instances[i] = cloudprovider.Instance{ + Id: asgNode.ProviderID, + Status: status, + } } return instances, nil } diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index a43dcfaeaee0..474a599603d8 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -31,7 +31,7 @@ import ( var testAwsManager = &AwsManager{ asgCache: &asgCache{ - registeredAsgs: make([]*asg, 0), + registeredAsgs: make(map[AwsRef]*asg, 0), asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), interrupt: make(chan struct{}), @@ -45,7 +45,7 @@ func newTestAwsManagerWithMockServices(mockAutoScaling autoScalingI, mockEC2 ec2 return &AwsManager{ awsService: awsService, asgCache: &asgCache{ - registeredAsgs: make([]*asg, 0), + registeredAsgs: make(map[AwsRef]*asg, 0), asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), asgInstanceTypeCache: newAsgInstanceTypeCache(&awsService), @@ -482,6 +482,12 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { expectedInstancesCount = 1 }).Return(nil) + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) + provider.Refresh() initialSize, err := asgs[0].TargetSize() diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager.go b/cluster-autoscaler/cloudprovider/aws/aws_manager.go index 6d4651566890..63a4a4395eb7 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager.go @@ -273,7 +273,7 @@ func (m *AwsManager) Cleanup() { m.asgCache.Cleanup() } -func (m *AwsManager) getAsgs() []*asg { +func (m *AwsManager) getAsgs() map[AwsRef]*asg { return m.asgCache.Get() } @@ -301,6 +301,11 @@ func (m *AwsManager) GetAsgNodes(ref AwsRef) ([]AwsInstanceRef, error) { return m.asgCache.InstancesByAsg(ref) } +// GetInstanceStatus returns the status of ASG nodes +func (m *AwsManager) GetInstanceStatus(ref AwsInstanceRef) (*string, error) { + return m.asgCache.InstanceStatus(ref) +} + func (m *AwsManager) getAsgTemplate(asg *asg) (*asgTemplate, error) { if len(asg.AvailabilityZones) < 1 { return nil, fmt.Errorf("unable to get first AvailabilityZone for ASG %q", asg.Name) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go index a6f649292597..295bbf0e92d2 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go @@ -358,6 +358,7 @@ func makeTaintSet(taints []apiv1.Taint) map[apiv1.Taint]bool { func TestFetchExplicitAsgs(t *testing.T) { min, max, groupname := 1, 10, "coolasg" + asgRef := AwsRef{Name: groupname} a := &autoScalingMock{} a.On("DescribeAutoScalingGroups", &autoscaling.DescribeAutoScalingGroupsInput{ @@ -390,6 +391,12 @@ func TestFetchExplicitAsgs(t *testing.T) { }}, false) }).Return(nil) + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("coolasg"), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) + do := cloudprovider.NodeGroupDiscoveryOptions{ // Register the same node group twice with different max nodes. // The intention is to test that the asgs.Register method will update @@ -408,7 +415,7 @@ func TestFetchExplicitAsgs(t *testing.T) { asgs := m.asgCache.Get() assert.Equal(t, 1, len(asgs)) - validateAsg(t, asgs[0], groupname, min, max) + validateAsg(t, asgs[asgRef], groupname, min, max) } func TestGetASGTemplate(t *testing.T) { @@ -504,6 +511,7 @@ func TestGetASGTemplate(t *testing.T) { func TestFetchAutoAsgs(t *testing.T) { min, max := 1, 10 groupname, tags := "coolasg", []string{"tag", "anothertag"} + asgRef := AwsRef{Name: groupname} a := &autoScalingMock{} // Lookup groups associated with tags @@ -547,6 +555,12 @@ func TestFetchAutoAsgs(t *testing.T) { }}}, false) }).Return(nil).Twice() + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("coolasg"), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) + do := cloudprovider.NodeGroupDiscoveryOptions{ NodeGroupAutoDiscoverySpecs: []string{fmt.Sprintf("asg:tag=%s", strings.Join(tags, ","))}, } @@ -561,7 +575,7 @@ func TestFetchAutoAsgs(t *testing.T) { asgs := m.asgCache.Get() assert.Equal(t, 1, len(asgs)) - validateAsg(t, asgs[0], groupname, min, max) + validateAsg(t, asgs[asgRef], groupname, min, max) // Simulate the previously discovered ASG disappearing a.On("DescribeTagsPages", mock.MatchedBy(tagsMatcher(expectedTagsInput)), diff --git a/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go b/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go index 86453300b757..1c7ef4eaa1ce 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go @@ -33,6 +33,7 @@ import ( type autoScalingI interface { DescribeAutoScalingGroupsPages(input *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error DescribeLaunchConfigurations(*autoscaling.DescribeLaunchConfigurationsInput) (*autoscaling.DescribeLaunchConfigurationsOutput, error) + DescribeScalingActivities(*autoscaling.DescribeScalingActivitiesInput) (*autoscaling.DescribeScalingActivitiesOutput, error) DescribeTagsPages(input *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error SetDesiredCapacity(input *autoscaling.SetDesiredCapacityInput) (*autoscaling.SetDesiredCapacityOutput, error) TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go b/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go index 4fc2778db03e..e360f341c8b6 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go @@ -45,6 +45,11 @@ func (a *autoScalingMock) DescribeLaunchConfigurations(i *autoscaling.DescribeLa return args.Get(0).(*autoscaling.DescribeLaunchConfigurationsOutput), nil } +func (a *autoScalingMock) DescribeScalingActivities(i *autoscaling.DescribeScalingActivitiesInput) (*autoscaling.DescribeScalingActivitiesOutput, error) { + args := a.Called(i) + return args.Get(0).(*autoscaling.DescribeScalingActivitiesOutput), args.Error(1) +} + func (a *autoScalingMock) DescribeTagsPages(i *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error { args := a.Called(i, fn) return args.Error(0) diff --git a/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go b/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go index 2b2ce74c8044..90c7f2fd2c4d 100644 --- a/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go +++ b/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go @@ -84,7 +84,7 @@ func (c *jitterClock) Since(ts time.Time) time.Duration { return since } -func (es instanceTypeExpirationStore) populate(autoscalingGroups []*asg) error { +func (es instanceTypeExpirationStore) populate(autoscalingGroups map[AwsRef]*asg) error { asgsToQuery := []*asg{} if c, ok := es.jitterClock.(*jitterClock); ok { diff --git a/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go b/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go index 94d0018e20cc..88cc75f087a8 100644 --- a/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go +++ b/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go @@ -79,9 +79,10 @@ func TestLTVersionChange(t *testing.T) { m := newAsgInstanceTypeCacheWithClock(&awsWrapper{a, e, nil}, fakeClock, fakeStore) for i := 0; i < 2; i++ { - err := m.populate([]*asg{ - { - AwsRef: AwsRef{Name: asgName}, + asgRef := AwsRef{Name: asgName} + err := m.populate(map[AwsRef]*asg{ + asgRef: { + AwsRef: asgRef, LaunchTemplate: &launchTemplate{ name: ltName, version: aws.StringValue(ltVersions[i]), From 1ad7ded0075cc9bb85143569d30bccb3d30d8a1d Mon Sep 17 00:00:00 2001 From: Kubernetes Prow Robot Date: Mon, 21 Feb 2022 06:54:14 -0800 Subject: [PATCH 2/6] Merge pull request #4452 from airbnb/es--grpc-expander-plugin Add gRPC expander plugin --- .../config/autoscaling_options.go | 4 + cluster-autoscaler/core/autoscaler.go | 4 +- cluster-autoscaler/expander/expander.go | 4 +- .../expander/factory/expander_factory.go | 7 +- .../expander/grpcplugin/README.md | 41 ++ .../grpcplugin/example/fake_grpc_server.go | 104 ++++ .../expander/grpcplugin/example/main.go | 30 ++ .../expander/grpcplugin/grpc_client.go | 143 +++++ .../expander/grpcplugin/grpc_client_test.go | 276 ++++++++++ .../expander/grpcplugin/protos/expander.pb.go | 440 ++++++++++++++++ .../expander/grpcplugin/protos/expander.proto | 30 ++ .../expander/mocks/GRPCPluginExpander.go | 107 ++++ cluster-autoscaler/go.mod | 2 + cluster-autoscaler/main.go | 5 + .../golang/mock/mockgen/model/model.go | 495 ++++++++++++++++++ cluster-autoscaler/vendor/modules.txt | 3 + hack/verify-golint.sh | 1 + 17 files changed, 1691 insertions(+), 5 deletions(-) create mode 100644 cluster-autoscaler/expander/grpcplugin/README.md create mode 100644 cluster-autoscaler/expander/grpcplugin/example/fake_grpc_server.go create mode 100644 cluster-autoscaler/expander/grpcplugin/example/main.go create mode 100644 cluster-autoscaler/expander/grpcplugin/grpc_client.go create mode 100644 cluster-autoscaler/expander/grpcplugin/grpc_client_test.go create mode 100644 cluster-autoscaler/expander/grpcplugin/protos/expander.pb.go create mode 100644 cluster-autoscaler/expander/grpcplugin/protos/expander.proto create mode 100644 cluster-autoscaler/expander/mocks/GRPCPluginExpander.go create mode 100644 cluster-autoscaler/vendor/github.com/golang/mock/mockgen/model/model.go diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 0843c289197a..0d60b6f2f356 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -71,6 +71,10 @@ type AutoscalingOptions struct { EstimatorName string // ExpanderNames sets the chain of node group expanders to be used in scale up ExpanderNames string + // GRPCExpanderCert is the location of the cert passed to the gRPC server for TLS when using the gRPC expander + GRPCExpanderCert string + // GRPCExpanderURL is the url of the gRPC server when using the gRPC expander + GRPCExpanderURL string // IgnoreDaemonSetsUtilization is whether CA will ignore DaemonSet pods when calculating resource utilization for scaling down IgnoreDaemonSetsUtilization bool // IgnoreMirrorPodsUtilization is whether CA will ignore Mirror pods when calculating resource utilization for scaling down diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index 64c7ef74e190..62216018769d 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -102,8 +102,8 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error { opts.CloudProvider = cloudBuilder.NewCloudProvider(opts.AutoscalingOptions) } if opts.ExpanderStrategy == nil { - expanderStrategy, err := factory.ExpanderStrategyFromStrings(strings.Split(opts.ExpanderNames, ","), - opts.CloudProvider, opts.AutoscalingKubeClients, opts.KubeClient, opts.ConfigNamespace) + expanderStrategy, err := factory.ExpanderStrategyFromStrings(strings.Split(opts.ExpanderNames, ","), opts.CloudProvider, + opts.AutoscalingKubeClients, opts.KubeClient, opts.ConfigNamespace, opts.GRPCExpanderCert, opts.GRPCExpanderURL) if err != nil { return err } diff --git a/cluster-autoscaler/expander/expander.go b/cluster-autoscaler/expander/expander.go index 7558bf428c49..57a91cfa78e7 100644 --- a/cluster-autoscaler/expander/expander.go +++ b/cluster-autoscaler/expander/expander.go @@ -24,7 +24,7 @@ import ( var ( // AvailableExpanders is a list of available expander options - AvailableExpanders = []string{RandomExpanderName, MostPodsExpanderName, LeastWasteExpanderName, PriceBasedExpanderName, PriorityBasedExpanderName} + AvailableExpanders = []string{RandomExpanderName, MostPodsExpanderName, LeastWasteExpanderName, PriceBasedExpanderName, PriorityBasedExpanderName, GRPCExpanderName} // RandomExpanderName selects a node group at random RandomExpanderName = "random" // MostPodsExpanderName selects a node group that fits the most pods @@ -36,6 +36,8 @@ var ( PriceBasedExpanderName = "price" // PriorityBasedExpanderName selects a node group based on a user-configured priorities assigned to group names PriorityBasedExpanderName = "priority" + // GRPCExpanderName uses the gRPC client expander to call to an external gRPC server to select a node group for scale up + GRPCExpanderName = "grpc" ) // Option describes an option to expand the cluster. diff --git a/cluster-autoscaler/expander/factory/expander_factory.go b/cluster-autoscaler/expander/factory/expander_factory.go index 485928032cdb..a0e0b7fe0d5a 100644 --- a/cluster-autoscaler/expander/factory/expander_factory.go +++ b/cluster-autoscaler/expander/factory/expander_factory.go @@ -20,6 +20,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/expander" + "k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin" "k8s.io/autoscaler/cluster-autoscaler/expander/mostpods" "k8s.io/autoscaler/cluster-autoscaler/expander/price" "k8s.io/autoscaler/cluster-autoscaler/expander/priority" @@ -27,14 +28,14 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/expander/waste" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" - kube_client "k8s.io/client-go/kubernetes" ) // ExpanderStrategyFromStrings creates an expander.Strategy according to the names of the expanders passed in +// take in whole opts and access stuff here func ExpanderStrategyFromStrings(expanderFlags []string, cloudProvider cloudprovider.CloudProvider, autoscalingKubeClients *context.AutoscalingKubeClients, kubeClient kube_client.Interface, - configNamespace string) (expander.Strategy, errors.AutoscalerError) { + configNamespace string, GRPCExpanderCert string, GRPCExpanderURL string) (expander.Strategy, errors.AutoscalerError) { var filters []expander.Filter seenExpanders := map[string]struct{}{} strategySeen := false @@ -67,6 +68,8 @@ func ExpanderStrategyFromStrings(expanderFlags []string, cloudProvider cloudprov stopChannel := make(chan struct{}) lister := kubernetes.NewConfigMapListerForNamespace(kubeClient, stopChannel, configNamespace) filters = append(filters, priority.NewFilter(lister.ConfigMaps(configNamespace), autoscalingKubeClients.Recorder)) + case expander.GRPCExpanderName: + filters = append(filters, grpcplugin.NewFilter(GRPCExpanderCert, GRPCExpanderURL)) default: return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s not supported", expanderFlag) } diff --git a/cluster-autoscaler/expander/grpcplugin/README.md b/cluster-autoscaler/expander/grpcplugin/README.md new file mode 100644 index 000000000000..4fd24408fd21 --- /dev/null +++ b/cluster-autoscaler/expander/grpcplugin/README.md @@ -0,0 +1,41 @@ +# gRPC Expander for Cluster Autoscaler + +## Introduction +This expander functions as a gRPC client, and will pass expansion options to an external gRPC server. +The external server will use this information to make a decision on which Node Group to expand, and return an option to expand. + +## Motivation + +This expander gives users very fine grained control over which option they'd like to expand. +The gRPC server must be implemented by the user, but the logic can be developed out of band with Cluster Autoscaler. +There are a wide variety of use cases here. Some examples are as follows: +* A tiered weighted random strategy can be implemented, instead of a static priority ladder offered by the priority expander. +* A strategy to encapsulate business logic specific to a user but not all users of Cluster Autoscaler +* A strategy to take into account the dynamic fluctuating prices of the spot instance market + +## Configuration options +As using this expander requires communication with another service, users must specify a few options as CLI arguments. + +```yaml +--grpcExpanderUrl +``` +URL of the gRPC Expander server, for CA to communicate with. +```yaml +--grpcExpanderCert +``` +Location of the volume mounted certificate of the gRPC server if it is configured to communicate over TLS + +## gRPC Expander Server Setup +The gRPC server can be set up in many ways, but a simple example is described below. +An example of a barebones gRPC Exapnder Server can be found in the `example` directory under `fake_grpc_server.go` file. This is meant to be copied elsewhere and deployed as a separate +service. Note that the `protos/expander.pb.go` generated protobuf code will also need to be copied and used to serialize/deserizle the Options passed from CA. +Communication between Cluster Autoscaler and the gRPC Server will occur over native kube-proxy. To use this, note the Service and Namespace the gRPC server is deployed in. + +Deploy the gRPC Expander Server as a separate app, listening on a specifc port number. +Start Cluster Autoscaler with the `--grpcExapnderURl=SERVICE_NAME.NAMESPACE_NAME.svc.cluster.local:PORT_NUMBER` flag, as well as `--grpcExpanderCert` pointed at the location of the volume mounted certificate of the gRPC server. + +## Details + +The gRPC client currently transforms nodeInfo objects passed into the expander to v1.Node objects to save rpc call throughput. As such, the gRPC server will not have access to daemonsets and static pods running on each node. + + diff --git a/cluster-autoscaler/expander/grpcplugin/example/fake_grpc_server.go b/cluster-autoscaler/expander/grpcplugin/example/fake_grpc_server.go new file mode 100644 index 000000000000..05fd5f47f06b --- /dev/null +++ b/cluster-autoscaler/expander/grpcplugin/example/fake_grpc_server.go @@ -0,0 +1,104 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package example + +import ( + "context" + "fmt" + "log" + "net" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos" +) + +// This code is meant to be used as starter code, deployed as a separate app, not in Cluster Autoscaler. +// This serves as the gRPC Expander Server counterpart to the client which lives in this repo +// main.go of said application should simply pass in paths to (optional)cert, (optional)private key, and port, and call Serve to start listening +// copy the protos/expander.pb.go to your other application's repo, so it has access to the protobuf definitions + +// Serve should be called by the main() function in main.go of the Expander Server repo to start serving +func Serve(certPath string, keyPath string, port uint) { + + var grpcServer *grpc.Server + + // If credentials are passed in, use them + if certPath != "" && keyPath != "" { + log.Printf("Using certFile: %v and keyFile: %v", certPath, keyPath) + tlsCredentials, err := credentials.NewServerTLSFromFile(certPath, keyPath) + if err != nil { + log.Fatal("cannot load TLS credentials: ", err) + } + grpcServer = grpc.NewServer(grpc.Creds(tlsCredentials)) + } else { + grpcServer = grpc.NewServer() + } + + netListener := getNetListener(port) + + expanderServerImpl := NewExpanderServerImpl() + + protos.RegisterExpanderServer(grpcServer, expanderServerImpl) + + // start the server + log.Println("Starting server on port ", port) + if err := grpcServer.Serve(netListener); err != nil { + log.Fatalf("failed to serve: %s", err) + } +} + +func getNetListener(port uint) net.Listener { + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + log.Fatalf("failed to listen: %v", err) + panic(fmt.Sprintf("failed to listen: %v", err)) + } + + return lis +} + +// ExpanderServerImpl is an implementation of Expander Server from proto definition +type ExpanderServerImpl struct{} + +// NewExpanderServerImpl is this Expander's implementation of the server +func NewExpanderServerImpl() *ExpanderServerImpl { + return &ExpanderServerImpl{} +} + +// BestOptions method filters out the best options of all options passed from the gRPC Client in CA, according to the defined strategy. +func (ServerImpl *ExpanderServerImpl) BestOptions(ctx context.Context, req *protos.BestOptionsRequest) (*protos.BestOptionsResponse, error) { + opts := req.GetOptions() + log.Printf("Received BestOption Request with %v options", len(opts)) + + // This strategy simply chooses the Option with the longest NodeGroupID name, but can be replaced with any arbitrary logic + longest := 0 + var choice *protos.Option + for _, opt := range opts { + log.Println(opt.NodeGroupId) + if len(opt.NodeGroupId) > longest { + choice = opt + } + } + + log.Print("returned bestOptions with option: ", choice.NodeGroupId) + + // Return just one option for now + return &protos.BestOptionsResponse{ + Options: []*protos.Option{choice}, + }, nil +} diff --git a/cluster-autoscaler/expander/grpcplugin/example/main.go b/cluster-autoscaler/expander/grpcplugin/example/main.go new file mode 100644 index 000000000000..5401416baa01 --- /dev/null +++ b/cluster-autoscaler/expander/grpcplugin/example/main.go @@ -0,0 +1,30 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package example + +import "flag" + +func main() { + + certPath := flag.String("cert-path", "", "Path to cert file for gRPC Expander Server") + keyPath := flag.String("key-path", "", "Path to private key for gRPC Expander Server") + port := flag.Uint("port", 7000, "Port number for server to listen on") + + flag.Parse() + + Serve(*certPath, *keyPath, *port) +} diff --git a/cluster-autoscaler/expander/grpcplugin/grpc_client.go b/cluster-autoscaler/expander/grpcplugin/grpc_client.go new file mode 100644 index 000000000000..7800bd270136 --- /dev/null +++ b/cluster-autoscaler/expander/grpcplugin/grpc_client.go @@ -0,0 +1,143 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpcplugin + +import ( + "context" + "log" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/expander" + "k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos" + "k8s.io/klog/v2" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +const gRPCTimeout = 5 * time.Second + +type grpcclientstrategy struct { + grpcClient protos.ExpanderClient +} + +// NewFilter returns an expansion filter that creates a gRPC client, and calls out to a gRPC server +func NewFilter(expanderCert string, expanderUrl string) expander.Filter { + client := createGRPCClient(expanderCert, expanderUrl) + if client == nil { + return &grpcclientstrategy{grpcClient: nil} + } + return &grpcclientstrategy{grpcClient: client} +} + +func createGRPCClient(expanderCert string, expanderUrl string) protos.ExpanderClient { + var dialOpt grpc.DialOption + + if expanderCert == "" { + log.Fatalf("GRPC Expander Cert not specified, insecure connections not allowed") + return nil + } + creds, err := credentials.NewClientTLSFromFile(expanderCert, "") + if err != nil { + log.Fatalf("Failed to create TLS credentials %v", err) + return nil + } + dialOpt = grpc.WithTransportCredentials(creds) + klog.V(2).Infof("Dialing: %s with dialopt: %v", expanderUrl, dialOpt) + conn, err := grpc.Dial(expanderUrl, dialOpt) + if err != nil { + log.Fatalf("Fail to dial server: %v", err) + return nil + } + return protos.NewExpanderClient(conn) +} + +func (g *grpcclientstrategy) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option { + if g.grpcClient == nil { + klog.Errorf("Incorrect gRPC client config, filtering no options") + return expansionOptions + } + + // Transform inputs to gRPC inputs + grpcOptionsSlice, nodeGroupIDOptionMap := populateOptionsForGRPC(expansionOptions) + grpcNodeMap := populateNodeInfoForGRPC(nodeInfo) + + // call gRPC server to get BestOption + klog.V(2).Infof("GPRC call of best options to server with %v options", len(nodeGroupIDOptionMap)) + ctx, cancel := context.WithTimeout(context.Background(), gRPCTimeout) + defer cancel() + bestOptionsResponse, err := g.grpcClient.BestOptions(ctx, &protos.BestOptionsRequest{Options: grpcOptionsSlice, NodeMap: grpcNodeMap}) + if err != nil { + klog.V(4).Info("GRPC call timed out, no options filtered") + return expansionOptions + } + + if bestOptionsResponse == nil || bestOptionsResponse.Options == nil { + klog.V(4).Info("GRPC returned nil bestOptions, no options filtered") + return expansionOptions + } + // Transform back options slice + options := transformAndSanitizeOptionsFromGRPC(bestOptionsResponse.Options, nodeGroupIDOptionMap) + if options == nil { + klog.V(4).Info("Unable to sanitize GPRC returned bestOptions, no options filtered") + return expansionOptions + } + return options +} + +// populateOptionsForGRPC creates a map of nodegroup ID and options, as well as a slice of Options objects for the gRPC call +func populateOptionsForGRPC(expansionOptions []expander.Option) ([]*protos.Option, map[string]expander.Option) { + grpcOptionsSlice := []*protos.Option{} + nodeGroupIDOptionMap := make(map[string]expander.Option) + for _, option := range expansionOptions { + nodeGroupIDOptionMap[option.NodeGroup.Id()] = option + grpcOptionsSlice = append(grpcOptionsSlice, newOptionMessage(option.NodeGroup.Id(), int32(option.NodeCount), option.Debug, option.Pods)) + } + return grpcOptionsSlice, nodeGroupIDOptionMap +} + +// populateNodeInfoForGRPC looks at the corresponding v1.Node object per NodeInfo object, and populates the grpcNodeInfoMap with these to pass over grpc +func populateNodeInfoForGRPC(nodeInfos map[string]*schedulerframework.NodeInfo) map[string]*v1.Node { + grpcNodeInfoMap := make(map[string]*v1.Node) + for nodeId, nodeInfo := range nodeInfos { + grpcNodeInfoMap[nodeId] = nodeInfo.Node() + } + return grpcNodeInfoMap +} + +func transformAndSanitizeOptionsFromGRPC(bestOptionsResponseOptions []*protos.Option, nodeGroupIDOptionMap map[string]expander.Option) []expander.Option { + var options []expander.Option + for _, option := range bestOptionsResponseOptions { + if option == nil { + klog.Errorf("GRPC server returned nil Option") + continue + } + if _, ok := nodeGroupIDOptionMap[option.NodeGroupId]; ok { + options = append(options, nodeGroupIDOptionMap[option.NodeGroupId]) + } else { + klog.Errorf("GRPC server returned invalid nodeGroup ID: ", option.NodeGroupId) + continue + } + } + return options +} + +func newOptionMessage(nodeGroupId string, nodeCount int32, debug string, pods []*v1.Pod) *protos.Option { + return &protos.Option{NodeGroupId: nodeGroupId, NodeCount: nodeCount, Debug: debug, Pod: pods} +} diff --git a/cluster-autoscaler/expander/grpcplugin/grpc_client_test.go b/cluster-autoscaler/expander/grpcplugin/grpc_client_test.go new file mode 100644 index 000000000000..65b94a17d54b --- /dev/null +++ b/cluster-autoscaler/expander/grpcplugin/grpc_client_test.go @@ -0,0 +1,276 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpcplugin + +import ( + "errors" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos" + "k8s.io/autoscaler/cluster-autoscaler/expander/mocks" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" + + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/expander" + + _ "github.com/golang/mock/mockgen/model" +) + +var ( + nodes = []*v1.Node{ + BuildTestNode("n1", 1000, 1000), + BuildTestNode("n2", 1000, 1000), + BuildTestNode("n3", 1000, 1000), + BuildTestNode("n4", 1000, 1000), + } + + eoT2Micro = expander.Option{ + Debug: "t2.micro", + NodeGroup: test.NewTestNodeGroup("my-asg.t2.micro", 10, 1, 1, true, false, "t2.micro", nil, nil), + } + eoT2Large = expander.Option{ + Debug: "t2.large", + NodeGroup: test.NewTestNodeGroup("my-asg.t2.large", 10, 1, 1, true, false, "t2.large", nil, nil), + } + eoT3Large = expander.Option{ + Debug: "t3.large", + NodeGroup: test.NewTestNodeGroup("my-asg.t3.large", 10, 1, 1, true, false, "t3.large", nil, nil), + } + eoM44XLarge = expander.Option{ + Debug: "m4.4xlarge", + NodeGroup: test.NewTestNodeGroup("my-asg.m4.4xlarge", 10, 1, 1, true, false, "m4.4xlarge", nil, nil), + } + options = []expander.Option{eoT2Micro, eoT2Large, eoT3Large, eoM44XLarge} + + grpcEoT2Micro = protos.Option{ + NodeGroupId: eoT2Micro.NodeGroup.Id(), + NodeCount: int32(eoT2Micro.NodeCount), + Debug: eoT2Micro.Debug, + Pod: eoT2Micro.Pods, + } + grpcEoT2Large = protos.Option{ + NodeGroupId: eoT2Large.NodeGroup.Id(), + NodeCount: int32(eoT2Large.NodeCount), + Debug: eoT2Large.Debug, + Pod: eoT2Large.Pods, + } + grpcEoT3Large = protos.Option{ + NodeGroupId: eoT3Large.NodeGroup.Id(), + NodeCount: int32(eoT3Large.NodeCount), + Debug: eoT3Large.Debug, + Pod: eoT3Large.Pods, + } + grpcEoM44XLarge = protos.Option{ + NodeGroupId: eoM44XLarge.NodeGroup.Id(), + NodeCount: int32(eoM44XLarge.NodeCount), + Debug: eoM44XLarge.Debug, + Pod: eoM44XLarge.Pods, + } +) + +func TestPopulateOptionsForGrpc(t *testing.T) { + testCases := []struct { + desc string + opts []expander.Option + expectedOpts []*protos.Option + expectedMap map[string]expander.Option + }{ + { + desc: "empty options", + opts: []expander.Option{}, + expectedOpts: []*protos.Option{}, + expectedMap: map[string]expander.Option{}, + }, + { + desc: "one option", + opts: []expander.Option{eoT2Micro}, + expectedOpts: []*protos.Option{&grpcEoT2Micro}, + expectedMap: map[string]expander.Option{eoT2Micro.NodeGroup.Id(): eoT2Micro}, + }, + { + desc: "many options", + opts: options, + expectedOpts: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge}, + expectedMap: map[string]expander.Option{ + eoT2Micro.NodeGroup.Id(): eoT2Micro, + eoT2Large.NodeGroup.Id(): eoT2Large, + eoT3Large.NodeGroup.Id(): eoT3Large, + eoM44XLarge.NodeGroup.Id(): eoM44XLarge, + }, + }, + } + for _, tc := range testCases { + grpcOptionsSlice, nodeGroupIDOptionMap := populateOptionsForGRPC(tc.opts) + assert.Equal(t, tc.expectedOpts, grpcOptionsSlice) + assert.Equal(t, tc.expectedMap, nodeGroupIDOptionMap) + } +} + +func makeFakeNodeInfos() map[string]*schedulerframework.NodeInfo { + nodeInfos := make(map[string]*schedulerframework.NodeInfo) + for i, opt := range options { + nodeInfo := schedulerframework.NewNodeInfo() + nodeInfo.SetNode(nodes[i]) + nodeInfos[opt.NodeGroup.Id()] = nodeInfo + } + return nodeInfos +} + +func TestPopulateNodeInfoForGRPC(t *testing.T) { + nodeInfos := makeFakeNodeInfos() + grpcNodeInfoMap := populateNodeInfoForGRPC(nodeInfos) + + expectedGrpcNodeInfoMap := make(map[string]*v1.Node) + for i, opt := range options { + expectedGrpcNodeInfoMap[opt.NodeGroup.Id()] = nodes[i] + } + assert.Equal(t, expectedGrpcNodeInfoMap, grpcNodeInfoMap) +} + +func TestValidTransformAndSanitizeOptionsFromGRPC(t *testing.T) { + responseOptionsSlice := []*protos.Option{&grpcEoT2Micro, &grpcEoT3Large, &grpcEoM44XLarge} + nodeGroupIDOptionMap := map[string]expander.Option{ + eoT2Micro.NodeGroup.Id(): eoT2Micro, + eoT2Large.NodeGroup.Id(): eoT2Large, + eoT3Large.NodeGroup.Id(): eoT3Large, + eoM44XLarge.NodeGroup.Id(): eoM44XLarge, + } + + expectedOptions := []expander.Option{eoT2Micro, eoT3Large, eoM44XLarge} + + ret := transformAndSanitizeOptionsFromGRPC(responseOptionsSlice, nodeGroupIDOptionMap) + assert.Equal(t, expectedOptions, ret) +} + +func TestAnInvalidTransformAndSanitizeOptionsFromGRPC(t *testing.T) { + responseOptionsSlice := []*protos.Option{&grpcEoT2Micro, &grpcEoT3Large, &grpcEoM44XLarge} + nodeGroupIDOptionMap := map[string]expander.Option{ + eoT2Micro.NodeGroup.Id(): eoT2Micro, + eoT2Large.NodeGroup.Id(): eoT2Large, + eoT3Large.NodeGroup.Id(): eoT3Large, + } + + ret := transformAndSanitizeOptionsFromGRPC(responseOptionsSlice, nodeGroupIDOptionMap) + assert.Equal(t, []expander.Option{eoT2Micro, eoT3Large}, ret) +} + +func TestBestOptionsValid(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockClient := mocks.NewMockExpanderClient(ctrl) + g := &grpcclientstrategy{mockClient} + + nodeInfos := makeFakeNodeInfos() + grpcNodeInfoMap := make(map[string]*v1.Node) + for i, opt := range options { + grpcNodeInfoMap[opt.NodeGroup.Id()] = nodes[i] + } + expectedBestOptionsReq := &protos.BestOptionsRequest{ + Options: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge}, + NodeMap: grpcNodeInfoMap, + } + + mockClient.EXPECT().BestOptions( + gomock.Any(), gomock.Eq(expectedBestOptionsReq), + ).Return(&protos.BestOptionsResponse{Options: []*protos.Option{&grpcEoT3Large}}, nil) + + resp := g.BestOptions(options, nodeInfos) + + assert.Equal(t, resp, []expander.Option{eoT3Large}) +} + +// All test cases should error, and no options should be filtered +func TestBestOptionsErrors(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockClient := mocks.NewMockExpanderClient(ctrl) + g := grpcclientstrategy{mockClient} + + badProtosOption := protos.Option{ + NodeGroupId: "badID", + NodeCount: int32(eoM44XLarge.NodeCount), + Debug: eoM44XLarge.Debug, + Pod: eoM44XLarge.Pods, + } + + testCases := []struct { + desc string + client grpcclientstrategy + nodeInfo map[string]*schedulerframework.NodeInfo + mockResponse protos.BestOptionsResponse + errResponse error + }{ + { + desc: "Bad gRPC client config", + client: grpcclientstrategy{nil}, + nodeInfo: makeFakeNodeInfos(), + mockResponse: protos.BestOptionsResponse{}, + errResponse: nil, + }, + { + desc: "gRPC error response", + client: g, + nodeInfo: makeFakeNodeInfos(), + mockResponse: protos.BestOptionsResponse{}, + errResponse: errors.New("timeout error"), + }, + { + desc: "bad bestOptions response", + client: g, + nodeInfo: makeFakeNodeInfos(), + mockResponse: protos.BestOptionsResponse{}, + errResponse: nil, + }, + { + desc: "bad bestOptions response, options nil", + client: g, + nodeInfo: makeFakeNodeInfos(), + mockResponse: protos.BestOptionsResponse{Options: nil}, + errResponse: nil, + }, + { + desc: "bad bestOptions response, options invalid - nil", + client: g, + nodeInfo: makeFakeNodeInfos(), + mockResponse: protos.BestOptionsResponse{Options: []*protos.Option{&grpcEoT2Micro, nil, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge}}, + errResponse: nil, + }, + { + desc: "bad bestOptions response, options invalid - nonExistent nodeID", + client: g, + nodeInfo: makeFakeNodeInfos(), + mockResponse: protos.BestOptionsResponse{Options: []*protos.Option{&grpcEoT2Micro, &badProtosOption, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge}}, + errResponse: nil, + }, + } + for _, tc := range testCases { + grpcNodeInfoMap := populateNodeInfoForGRPC(tc.nodeInfo) + mockClient.EXPECT().BestOptions( + gomock.Any(), gomock.Eq( + &protos.BestOptionsRequest{ + Options: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge}, + NodeMap: grpcNodeInfoMap, + })).Return(&tc.mockResponse, tc.errResponse) + resp := g.BestOptions(options, tc.nodeInfo) + + assert.Equal(t, resp, options) + } +} diff --git a/cluster-autoscaler/expander/grpcplugin/protos/expander.pb.go b/cluster-autoscaler/expander/grpcplugin/protos/expander.pb.go new file mode 100644 index 000000000000..3c071d2f37d7 --- /dev/null +++ b/cluster-autoscaler/expander/grpcplugin/protos/expander.pb.go @@ -0,0 +1,440 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protos + +import ( + context "context" + reflect "reflect" + sync "sync" + + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + v1 "k8s.io/api/core/v1" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type BestOptionsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Options []*Option `protobuf:"bytes,1,rep,name=options,proto3" json:"options,omitempty"` + // key is node id from options + NodeMap map[string]*v1.Node `protobuf:"bytes,2,rep,name=nodeMap,proto3" json:"nodeMap,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *BestOptionsRequest) Reset() { + *x = BestOptionsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BestOptionsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BestOptionsRequest) ProtoMessage() {} + +func (x *BestOptionsRequest) ProtoReflect() protoreflect.Message { + mi := &file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BestOptionsRequest.ProtoReflect.Descriptor instead. +func (*BestOptionsRequest) Descriptor() ([]byte, []int) { + return file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescGZIP(), []int{0} +} + +func (x *BestOptionsRequest) GetOptions() []*Option { + if x != nil { + return x.Options + } + return nil +} + +func (x *BestOptionsRequest) GetNodeMap() map[string]*v1.Node { + if x != nil { + return x.NodeMap + } + return nil +} + +type BestOptionsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Options []*Option `protobuf:"bytes,1,rep,name=options,proto3" json:"options,omitempty"` +} + +func (x *BestOptionsResponse) Reset() { + *x = BestOptionsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BestOptionsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BestOptionsResponse) ProtoMessage() {} + +func (x *BestOptionsResponse) ProtoReflect() protoreflect.Message { + mi := &file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BestOptionsResponse.ProtoReflect.Descriptor instead. +func (*BestOptionsResponse) Descriptor() ([]byte, []int) { + return file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescGZIP(), []int{1} +} + +func (x *BestOptionsResponse) GetOptions() []*Option { + if x != nil { + return x.Options + } + return nil +} + +type Option struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // only need the ID of node to uniquely identify the nodeGroup, used in the nodeInfo map. + NodeGroupId string `protobuf:"bytes,1,opt,name=nodeGroupId,proto3" json:"nodeGroupId,omitempty"` + NodeCount int32 `protobuf:"varint,2,opt,name=nodeCount,proto3" json:"nodeCount,omitempty"` + Debug string `protobuf:"bytes,3,opt,name=debug,proto3" json:"debug,omitempty"` + Pod []*v1.Pod `protobuf:"bytes,4,rep,name=pod,proto3" json:"pod,omitempty"` +} + +func (x *Option) Reset() { + *x = Option{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Option) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Option) ProtoMessage() {} + +func (x *Option) ProtoReflect() protoreflect.Message { + mi := &file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Option.ProtoReflect.Descriptor instead. +func (*Option) Descriptor() ([]byte, []int) { + return file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescGZIP(), []int{2} +} + +func (x *Option) GetNodeGroupId() string { + if x != nil { + return x.NodeGroupId + } + return "" +} + +func (x *Option) GetNodeCount() int32 { + if x != nil { + return x.NodeCount + } + return 0 +} + +func (x *Option) GetDebug() string { + if x != nil { + return x.Debug + } + return "" +} + +func (x *Option) GetPod() []*v1.Pod { + if x != nil { + return x.Pod + } + return nil +} + +var File_cluster_autoscaler_expander_grpcplugin_protos_expander_proto protoreflect.FileDescriptor + +var file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDesc = []byte{ + 0x0a, 0x3c, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2d, 0x61, 0x75, 0x74, 0x6f, 0x73, 0x63, + 0x61, 0x6c, 0x65, 0x72, 0x2f, 0x65, 0x78, 0x70, 0x61, 0x6e, 0x64, 0x65, 0x72, 0x2f, 0x67, 0x72, + 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, + 0x65, 0x78, 0x70, 0x61, 0x6e, 0x64, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, + 0x67, 0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x1a, 0x22, 0x6b, 0x38, 0x73, 0x2e, + 0x69, 0x6f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x76, 0x31, 0x2f, 0x67, + 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xdf, + 0x01, 0x0a, 0x12, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, + 0x67, 0x69, 0x6e, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x12, 0x45, 0x0a, 0x07, 0x6e, 0x6f, 0x64, 0x65, 0x4d, 0x61, 0x70, 0x18, 0x02, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, + 0x6e, 0x2e, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, + 0x79, 0x52, 0x07, 0x6e, 0x6f, 0x64, 0x65, 0x4d, 0x61, 0x70, 0x1a, 0x54, 0x0a, 0x0c, 0x4e, 0x6f, + 0x64, 0x65, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x2e, 0x0a, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6b, 0x38, + 0x73, 0x2e, 0x69, 0x6f, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, + 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, + 0x22, 0x43, 0x0a, 0x13, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2c, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x70, + 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x6f, 0x70, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x89, 0x01, 0x0a, 0x06, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x20, 0x0a, 0x0b, 0x6e, 0x6f, 0x64, 0x65, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x49, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x6e, 0x6f, 0x64, 0x65, 0x47, 0x72, 0x6f, 0x75, 0x70, + 0x49, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x6f, 0x64, 0x65, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x6e, 0x6f, 0x64, 0x65, 0x43, 0x6f, 0x75, 0x6e, 0x74, + 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x62, 0x75, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x64, 0x65, 0x62, 0x75, 0x67, 0x12, 0x29, 0x0a, 0x03, 0x70, 0x6f, 0x64, 0x18, 0x04, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6b, 0x38, 0x73, 0x2e, 0x69, 0x6f, 0x2e, 0x61, 0x70, 0x69, + 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x6f, 0x64, 0x52, 0x03, 0x70, 0x6f, + 0x64, 0x32, 0x5c, 0x0a, 0x08, 0x45, 0x78, 0x70, 0x61, 0x6e, 0x64, 0x65, 0x72, 0x12, 0x50, 0x0a, + 0x0b, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1e, 0x2e, 0x67, + 0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x67, + 0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, + 0x2f, 0x5a, 0x2d, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2d, 0x61, 0x75, 0x74, 0x6f, 0x73, + 0x63, 0x61, 0x6c, 0x65, 0x72, 0x2f, 0x65, 0x78, 0x70, 0x61, 0x6e, 0x64, 0x65, 0x72, 0x2f, 0x67, + 0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescOnce sync.Once + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescData = file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDesc +) + +func file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescGZIP() []byte { + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescOnce.Do(func() { + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescData = protoimpl.X.CompressGZIP(file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescData) + }) + return file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescData +} + +var file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_goTypes = []interface{}{ + (*BestOptionsRequest)(nil), // 0: grpcplugin.BestOptionsRequest + (*BestOptionsResponse)(nil), // 1: grpcplugin.BestOptionsResponse + (*Option)(nil), // 2: grpcplugin.Option + nil, // 3: grpcplugin.BestOptionsRequest.NodeMapEntry + (*v1.Pod)(nil), // 4: k8s.io.api.core.v1.Pod + (*v1.Node)(nil), // 5: k8s.io.api.core.v1.Node +} +var file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_depIdxs = []int32{ + 2, // 0: grpcplugin.BestOptionsRequest.options:type_name -> grpcplugin.Option + 3, // 1: grpcplugin.BestOptionsRequest.nodeMap:type_name -> grpcplugin.BestOptionsRequest.NodeMapEntry + 2, // 2: grpcplugin.BestOptionsResponse.options:type_name -> grpcplugin.Option + 4, // 3: grpcplugin.Option.pod:type_name -> k8s.io.api.core.v1.Pod + 5, // 4: grpcplugin.BestOptionsRequest.NodeMapEntry.value:type_name -> k8s.io.api.core.v1.Node + 0, // 5: grpcplugin.Expander.BestOptions:input_type -> grpcplugin.BestOptionsRequest + 1, // 6: grpcplugin.Expander.BestOptions:output_type -> grpcplugin.BestOptionsResponse + 6, // [6:7] is the sub-list for method output_type + 5, // [5:6] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name +} + +func init() { file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_init() } +func file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_init() { + if File_cluster_autoscaler_expander_grpcplugin_protos_expander_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BestOptionsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BestOptionsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Option); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_goTypes, + DependencyIndexes: file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_depIdxs, + MessageInfos: file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes, + }.Build() + File_cluster_autoscaler_expander_grpcplugin_protos_expander_proto = out.File + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDesc = nil + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_goTypes = nil + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_depIdxs = nil +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConnInterface + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion6 + +// ExpanderClient is the client API for Expander service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type ExpanderClient interface { + BestOptions(ctx context.Context, in *BestOptionsRequest, opts ...grpc.CallOption) (*BestOptionsResponse, error) +} + +type expanderClient struct { + cc grpc.ClientConnInterface +} + +func NewExpanderClient(cc grpc.ClientConnInterface) ExpanderClient { + return &expanderClient{cc} +} + +func (c *expanderClient) BestOptions(ctx context.Context, in *BestOptionsRequest, opts ...grpc.CallOption) (*BestOptionsResponse, error) { + out := new(BestOptionsResponse) + err := c.cc.Invoke(ctx, "/grpcplugin.Expander/BestOptions", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// ExpanderServer is the server API for Expander service. +type ExpanderServer interface { + BestOptions(context.Context, *BestOptionsRequest) (*BestOptionsResponse, error) +} + +// UnimplementedExpanderServer can be embedded to have forward compatible implementations. +type UnimplementedExpanderServer struct { +} + +func (*UnimplementedExpanderServer) BestOptions(context.Context, *BestOptionsRequest) (*BestOptionsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method BestOptions not implemented") +} + +func RegisterExpanderServer(s *grpc.Server, srv ExpanderServer) { + s.RegisterService(&_Expander_serviceDesc, srv) +} + +func _Expander_BestOptions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BestOptionsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ExpanderServer).BestOptions(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/grpcplugin.Expander/BestOptions", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ExpanderServer).BestOptions(ctx, req.(*BestOptionsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Expander_serviceDesc = grpc.ServiceDesc{ + ServiceName: "grpcplugin.Expander", + HandlerType: (*ExpanderServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "BestOptions", + Handler: _Expander_BestOptions_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "cluster-autoscaler/expander/grpcplugin/protos/expander.proto", +} diff --git a/cluster-autoscaler/expander/grpcplugin/protos/expander.proto b/cluster-autoscaler/expander/grpcplugin/protos/expander.proto new file mode 100644 index 000000000000..5a08e8ff301b --- /dev/null +++ b/cluster-autoscaler/expander/grpcplugin/protos/expander.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; + +package grpcplugin; +import "k8s.io/api/core/v1/generated.proto"; +option go_package = "cluster-autoscaler/expander/grpcplugin/protos"; + + + +// Interface for Expander +service Expander { + + rpc BestOptions (BestOptionsRequest) + returns (BestOptionsResponse) {} +} + +message BestOptionsRequest { + repeated Option options = 1; + // key is node id from options + map nodeMap = 2; +} +message BestOptionsResponse { + repeated Option options = 1; +} +message Option { + // only need the ID of node to uniquely identify the nodeGroup, used in the nodeInfo map. + string nodeGroupId = 1; + int32 nodeCount = 2; + string debug = 3; + repeated k8s.io.api.core.v1.Pod pod = 4; +} diff --git a/cluster-autoscaler/expander/mocks/GRPCPluginExpander.go b/cluster-autoscaler/expander/mocks/GRPCPluginExpander.go new file mode 100644 index 000000000000..358fcfcc857e --- /dev/null +++ b/cluster-autoscaler/expander/mocks/GRPCPluginExpander.go @@ -0,0 +1,107 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mocks + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + grpc "google.golang.org/grpc" + "k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos" +) + +// MockExpanderClient is a mock of ExpanderClient interface. +type MockExpanderClient struct { + ctrl *gomock.Controller + recorder *MockExpanderClientMockRecorder +} + +// MockExpanderClientMockRecorder is the mock recorder for MockExpanderClient. +type MockExpanderClientMockRecorder struct { + mock *MockExpanderClient +} + +// NewMockExpanderClient creates a new mock instance. +func NewMockExpanderClient(ctrl *gomock.Controller) *MockExpanderClient { + mock := &MockExpanderClient{ctrl: ctrl} + mock.recorder = &MockExpanderClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockExpanderClient) EXPECT() *MockExpanderClientMockRecorder { + return m.recorder +} + +// BestOptions mocks base method. +func (m *MockExpanderClient) BestOptions(ctx context.Context, in *protos.BestOptionsRequest, opts ...grpc.CallOption) (*protos.BestOptionsResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "BestOptions", varargs...) + ret0, _ := ret[0].(*protos.BestOptionsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BestOptions indicates an expected call of BestOptions. +func (mr *MockExpanderClientMockRecorder) BestOptions(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BestOptions", reflect.TypeOf((*MockExpanderClient)(nil).BestOptions), varargs...) +} + +// MockExpanderServer is a mock of ExpanderServer interface. +type MockExpanderServer struct { + ctrl *gomock.Controller + recorder *MockExpanderServerMockRecorder +} + +// MockExpanderServerMockRecorder is the mock recorder for MockExpanderServer. +type MockExpanderServerMockRecorder struct { + mock *MockExpanderServer +} + +// NewMockExpanderServer creates a new mock instance. +func NewMockExpanderServer(ctrl *gomock.Controller) *MockExpanderServer { + mock := &MockExpanderServer{ctrl: ctrl} + mock.recorder = &MockExpanderServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockExpanderServer) EXPECT() *MockExpanderServerMockRecorder { + return m.recorder +} + +// BestOptions mocks base method. +func (m *MockExpanderServer) BestOptions(arg0 context.Context, arg1 *protos.BestOptionsRequest) (*protos.BestOptionsResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BestOptions", arg0, arg1) + ret0, _ := ret[0].(*protos.BestOptionsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BestOptions indicates an expected call of BestOptions. +func (mr *MockExpanderServerMockRecorder) BestOptions(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BestOptions", reflect.TypeOf((*MockExpanderServer)(nil).BestOptions), arg0, arg1) +} diff --git a/cluster-autoscaler/go.mod b/cluster-autoscaler/go.mod index bc2f647113d5..21b78d2ce167 100644 --- a/cluster-autoscaler/go.mod +++ b/cluster-autoscaler/go.mod @@ -25,6 +25,8 @@ require ( golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 google.golang.org/api v0.46.0 + google.golang.org/grpc v1.42.0 + google.golang.org/protobuf v1.27.1 gopkg.in/gcfg.v1 v1.2.0 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.23.5 diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 0c067bf71818..fa16c2f7f2ff 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -152,6 +152,9 @@ var ( expanderFlag = flag.String("expander", expander.RandomExpanderName, "Type of node group expander to be used in scale up. Available values: ["+strings.Join(expander.AvailableExpanders, ",")+"]. Specifying multiple values separated by commas will call the expanders in succession until there is only one option remaining. Ties still existing after this process are broken randomly.") + grpcExpanderCert = flag.String("grpc-expander-cert", "", "Path to cert used by gRPC server over TLS") + grpcExpanderURL = flag.String("grpc-expander-url", "", "URL to reach gRPC expander server.") + ignoreDaemonSetsUtilization = flag.Bool("ignore-daemonsets-utilization", false, "Should CA ignore DaemonSet pods when calculating resource utilization for scaling down") ignoreMirrorPodsUtilization = flag.Bool("ignore-mirror-pods-utilization", false, @@ -216,6 +219,8 @@ func createAutoscalingOptions() config.AutoscalingOptions { ScaleUpFromZero: *scaleUpFromZero, EstimatorName: *estimatorFlag, ExpanderNames: *expanderFlag, + GRPCExpanderCert: *grpcExpanderCert, + GRPCExpanderURL: *grpcExpanderURL, IgnoreDaemonSetsUtilization: *ignoreDaemonSetsUtilization, IgnoreMirrorPodsUtilization: *ignoreMirrorPodsUtilization, MaxBulkSoftTaintCount: *maxBulkSoftTaintCount, diff --git a/cluster-autoscaler/vendor/github.com/golang/mock/mockgen/model/model.go b/cluster-autoscaler/vendor/github.com/golang/mock/mockgen/model/model.go new file mode 100644 index 000000000000..2c6a62ceb268 --- /dev/null +++ b/cluster-autoscaler/vendor/github.com/golang/mock/mockgen/model/model.go @@ -0,0 +1,495 @@ +// Copyright 2012 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package model contains the data model necessary for generating mock implementations. +package model + +import ( + "encoding/gob" + "fmt" + "io" + "reflect" + "strings" +) + +// pkgPath is the importable path for package model +const pkgPath = "github.com/golang/mock/mockgen/model" + +// Package is a Go package. It may be a subset. +type Package struct { + Name string + PkgPath string + Interfaces []*Interface + DotImports []string +} + +// Print writes the package name and its exported interfaces. +func (pkg *Package) Print(w io.Writer) { + _, _ = fmt.Fprintf(w, "package %s\n", pkg.Name) + for _, intf := range pkg.Interfaces { + intf.Print(w) + } +} + +// Imports returns the imports needed by the Package as a set of import paths. +func (pkg *Package) Imports() map[string]bool { + im := make(map[string]bool) + for _, intf := range pkg.Interfaces { + intf.addImports(im) + } + return im +} + +// Interface is a Go interface. +type Interface struct { + Name string + Methods []*Method +} + +// Print writes the interface name and its methods. +func (intf *Interface) Print(w io.Writer) { + _, _ = fmt.Fprintf(w, "interface %s\n", intf.Name) + for _, m := range intf.Methods { + m.Print(w) + } +} + +func (intf *Interface) addImports(im map[string]bool) { + for _, m := range intf.Methods { + m.addImports(im) + } +} + +// AddMethod adds a new method, de-duplicating by method name. +func (intf *Interface) AddMethod(m *Method) { + for _, me := range intf.Methods { + if me.Name == m.Name { + return + } + } + intf.Methods = append(intf.Methods, m) +} + +// Method is a single method of an interface. +type Method struct { + Name string + In, Out []*Parameter + Variadic *Parameter // may be nil +} + +// Print writes the method name and its signature. +func (m *Method) Print(w io.Writer) { + _, _ = fmt.Fprintf(w, " - method %s\n", m.Name) + if len(m.In) > 0 { + _, _ = fmt.Fprintf(w, " in:\n") + for _, p := range m.In { + p.Print(w) + } + } + if m.Variadic != nil { + _, _ = fmt.Fprintf(w, " ...:\n") + m.Variadic.Print(w) + } + if len(m.Out) > 0 { + _, _ = fmt.Fprintf(w, " out:\n") + for _, p := range m.Out { + p.Print(w) + } + } +} + +func (m *Method) addImports(im map[string]bool) { + for _, p := range m.In { + p.Type.addImports(im) + } + if m.Variadic != nil { + m.Variadic.Type.addImports(im) + } + for _, p := range m.Out { + p.Type.addImports(im) + } +} + +// Parameter is an argument or return parameter of a method. +type Parameter struct { + Name string // may be empty + Type Type +} + +// Print writes a method parameter. +func (p *Parameter) Print(w io.Writer) { + n := p.Name + if n == "" { + n = `""` + } + _, _ = fmt.Fprintf(w, " - %v: %v\n", n, p.Type.String(nil, "")) +} + +// Type is a Go type. +type Type interface { + String(pm map[string]string, pkgOverride string) string + addImports(im map[string]bool) +} + +func init() { + gob.Register(&ArrayType{}) + gob.Register(&ChanType{}) + gob.Register(&FuncType{}) + gob.Register(&MapType{}) + gob.Register(&NamedType{}) + gob.Register(&PointerType{}) + + // Call gob.RegisterName to make sure it has the consistent name registered + // for both gob decoder and encoder. + // + // For a non-pointer type, gob.Register will try to get package full path by + // calling rt.PkgPath() for a name to register. If your project has vendor + // directory, it is possible that PkgPath will get a path like this: + // ../../../vendor/github.com/golang/mock/mockgen/model + gob.RegisterName(pkgPath+".PredeclaredType", PredeclaredType("")) +} + +// ArrayType is an array or slice type. +type ArrayType struct { + Len int // -1 for slices, >= 0 for arrays + Type Type +} + +func (at *ArrayType) String(pm map[string]string, pkgOverride string) string { + s := "[]" + if at.Len > -1 { + s = fmt.Sprintf("[%d]", at.Len) + } + return s + at.Type.String(pm, pkgOverride) +} + +func (at *ArrayType) addImports(im map[string]bool) { at.Type.addImports(im) } + +// ChanType is a channel type. +type ChanType struct { + Dir ChanDir // 0, 1 or 2 + Type Type +} + +func (ct *ChanType) String(pm map[string]string, pkgOverride string) string { + s := ct.Type.String(pm, pkgOverride) + if ct.Dir == RecvDir { + return "<-chan " + s + } + if ct.Dir == SendDir { + return "chan<- " + s + } + return "chan " + s +} + +func (ct *ChanType) addImports(im map[string]bool) { ct.Type.addImports(im) } + +// ChanDir is a channel direction. +type ChanDir int + +// Constants for channel directions. +const ( + RecvDir ChanDir = 1 + SendDir ChanDir = 2 +) + +// FuncType is a function type. +type FuncType struct { + In, Out []*Parameter + Variadic *Parameter // may be nil +} + +func (ft *FuncType) String(pm map[string]string, pkgOverride string) string { + args := make([]string, len(ft.In)) + for i, p := range ft.In { + args[i] = p.Type.String(pm, pkgOverride) + } + if ft.Variadic != nil { + args = append(args, "..."+ft.Variadic.Type.String(pm, pkgOverride)) + } + rets := make([]string, len(ft.Out)) + for i, p := range ft.Out { + rets[i] = p.Type.String(pm, pkgOverride) + } + retString := strings.Join(rets, ", ") + if nOut := len(ft.Out); nOut == 1 { + retString = " " + retString + } else if nOut > 1 { + retString = " (" + retString + ")" + } + return "func(" + strings.Join(args, ", ") + ")" + retString +} + +func (ft *FuncType) addImports(im map[string]bool) { + for _, p := range ft.In { + p.Type.addImports(im) + } + if ft.Variadic != nil { + ft.Variadic.Type.addImports(im) + } + for _, p := range ft.Out { + p.Type.addImports(im) + } +} + +// MapType is a map type. +type MapType struct { + Key, Value Type +} + +func (mt *MapType) String(pm map[string]string, pkgOverride string) string { + return "map[" + mt.Key.String(pm, pkgOverride) + "]" + mt.Value.String(pm, pkgOverride) +} + +func (mt *MapType) addImports(im map[string]bool) { + mt.Key.addImports(im) + mt.Value.addImports(im) +} + +// NamedType is an exported type in a package. +type NamedType struct { + Package string // may be empty + Type string +} + +func (nt *NamedType) String(pm map[string]string, pkgOverride string) string { + if pkgOverride == nt.Package { + return nt.Type + } + prefix := pm[nt.Package] + if prefix != "" { + return prefix + "." + nt.Type + } + + return nt.Type +} + +func (nt *NamedType) addImports(im map[string]bool) { + if nt.Package != "" { + im[nt.Package] = true + } +} + +// PointerType is a pointer to another type. +type PointerType struct { + Type Type +} + +func (pt *PointerType) String(pm map[string]string, pkgOverride string) string { + return "*" + pt.Type.String(pm, pkgOverride) +} +func (pt *PointerType) addImports(im map[string]bool) { pt.Type.addImports(im) } + +// PredeclaredType is a predeclared type such as "int". +type PredeclaredType string + +func (pt PredeclaredType) String(map[string]string, string) string { return string(pt) } +func (pt PredeclaredType) addImports(map[string]bool) {} + +// The following code is intended to be called by the program generated by ../reflect.go. + +// InterfaceFromInterfaceType returns a pointer to an interface for the +// given reflection interface type. +func InterfaceFromInterfaceType(it reflect.Type) (*Interface, error) { + if it.Kind() != reflect.Interface { + return nil, fmt.Errorf("%v is not an interface", it) + } + intf := &Interface{} + + for i := 0; i < it.NumMethod(); i++ { + mt := it.Method(i) + // TODO: need to skip unexported methods? or just raise an error? + m := &Method{ + Name: mt.Name, + } + + var err error + m.In, m.Variadic, m.Out, err = funcArgsFromType(mt.Type) + if err != nil { + return nil, err + } + + intf.AddMethod(m) + } + + return intf, nil +} + +// t's Kind must be a reflect.Func. +func funcArgsFromType(t reflect.Type) (in []*Parameter, variadic *Parameter, out []*Parameter, err error) { + nin := t.NumIn() + if t.IsVariadic() { + nin-- + } + var p *Parameter + for i := 0; i < nin; i++ { + p, err = parameterFromType(t.In(i)) + if err != nil { + return + } + in = append(in, p) + } + if t.IsVariadic() { + p, err = parameterFromType(t.In(nin).Elem()) + if err != nil { + return + } + variadic = p + } + for i := 0; i < t.NumOut(); i++ { + p, err = parameterFromType(t.Out(i)) + if err != nil { + return + } + out = append(out, p) + } + return +} + +func parameterFromType(t reflect.Type) (*Parameter, error) { + tt, err := typeFromType(t) + if err != nil { + return nil, err + } + return &Parameter{Type: tt}, nil +} + +var errorType = reflect.TypeOf((*error)(nil)).Elem() + +var byteType = reflect.TypeOf(byte(0)) + +func typeFromType(t reflect.Type) (Type, error) { + // Hack workaround for https://golang.org/issue/3853. + // This explicit check should not be necessary. + if t == byteType { + return PredeclaredType("byte"), nil + } + + if imp := t.PkgPath(); imp != "" { + return &NamedType{ + Package: impPath(imp), + Type: t.Name(), + }, nil + } + + // only unnamed or predeclared types after here + + // Lots of types have element types. Let's do the parsing and error checking for all of them. + var elemType Type + switch t.Kind() { + case reflect.Array, reflect.Chan, reflect.Map, reflect.Ptr, reflect.Slice: + var err error + elemType, err = typeFromType(t.Elem()) + if err != nil { + return nil, err + } + } + + switch t.Kind() { + case reflect.Array: + return &ArrayType{ + Len: t.Len(), + Type: elemType, + }, nil + case reflect.Bool, reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, + reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr, + reflect.Float32, reflect.Float64, reflect.Complex64, reflect.Complex128, reflect.String: + return PredeclaredType(t.Kind().String()), nil + case reflect.Chan: + var dir ChanDir + switch t.ChanDir() { + case reflect.RecvDir: + dir = RecvDir + case reflect.SendDir: + dir = SendDir + } + return &ChanType{ + Dir: dir, + Type: elemType, + }, nil + case reflect.Func: + in, variadic, out, err := funcArgsFromType(t) + if err != nil { + return nil, err + } + return &FuncType{ + In: in, + Out: out, + Variadic: variadic, + }, nil + case reflect.Interface: + // Two special interfaces. + if t.NumMethod() == 0 { + return PredeclaredType("interface{}"), nil + } + if t == errorType { + return PredeclaredType("error"), nil + } + case reflect.Map: + kt, err := typeFromType(t.Key()) + if err != nil { + return nil, err + } + return &MapType{ + Key: kt, + Value: elemType, + }, nil + case reflect.Ptr: + return &PointerType{ + Type: elemType, + }, nil + case reflect.Slice: + return &ArrayType{ + Len: -1, + Type: elemType, + }, nil + case reflect.Struct: + if t.NumField() == 0 { + return PredeclaredType("struct{}"), nil + } + } + + // TODO: Struct, UnsafePointer + return nil, fmt.Errorf("can't yet turn %v (%v) into a model.Type", t, t.Kind()) +} + +// impPath sanitizes the package path returned by `PkgPath` method of a reflect Type so that +// it is importable. PkgPath might return a path that includes "vendor". These paths do not +// compile, so we need to remove everything up to and including "/vendor/". +// See https://github.com/golang/go/issues/12019. +func impPath(imp string) string { + if strings.HasPrefix(imp, "vendor/") { + imp = "/" + imp + } + if i := strings.LastIndex(imp, "/vendor/"); i != -1 { + imp = imp[i+len("/vendor/"):] + } + return imp +} + +// ErrorInterface represent built-in error interface. +var ErrorInterface = Interface{ + Name: "error", + Methods: []*Method{ + { + Name: "Error", + Out: []*Parameter{ + { + Name: "", + Type: PredeclaredType("string"), + }, + }, + }, + }, +} diff --git a/cluster-autoscaler/vendor/modules.txt b/cluster-autoscaler/vendor/modules.txt index 0e5d038728c5..60bddcdbb446 100644 --- a/cluster-autoscaler/vendor/modules.txt +++ b/cluster-autoscaler/vendor/modules.txt @@ -280,6 +280,7 @@ github.com/golang/groupcache/lru # github.com/golang/mock v1.6.0 ## explicit github.com/golang/mock/gomock +github.com/golang/mock/mockgen/model # github.com/golang/protobuf v1.5.2 github.com/golang/protobuf/descriptor github.com/golang/protobuf/jsonpb @@ -793,6 +794,7 @@ google.golang.org/genproto/googleapis/api/httpbody google.golang.org/genproto/googleapis/rpc/status google.golang.org/genproto/protobuf/field_mask # google.golang.org/grpc v1.42.0 +## explicit google.golang.org/grpc google.golang.org/grpc/attributes google.golang.org/grpc/backoff @@ -842,6 +844,7 @@ google.golang.org/grpc/stats google.golang.org/grpc/status google.golang.org/grpc/tap # google.golang.org/protobuf v1.27.1 +## explicit google.golang.org/protobuf/encoding/protojson google.golang.org/protobuf/encoding/prototext google.golang.org/protobuf/encoding/protowire diff --git a/hack/verify-golint.sh b/hack/verify-golint.sh index 40f0a324bece..8ac285d8ab60 100755 --- a/hack/verify-golint.sh +++ b/hack/verify-golint.sh @@ -36,6 +36,7 @@ excluded_packages=( 'cluster-autoscaler/cloudprovider/huaweicloud/huaweicloud-sdk-go-v3' 'cluster-autoscaler/cloudprovider/ionoscloud/ionos-cloud-sdk-go' 'cluster-autoscaler/cloudprovider/hetzner/hcloud-go' + 'cluster-autoscaler/expander/grpcplugin/protos' ) FIND_PACKAGES='go list ./... ' From 24b715ca0aad759a8ea9e7f422bd7ab50270c410 Mon Sep 17 00:00:00 2001 From: Kubernetes Prow Robot Date: Mon, 20 Jun 2022 08:48:05 -0700 Subject: [PATCH 3/6] Merge pull request #4970 from MaciekPytel/estimation_limiter Binpacking can exit without packing all the pods --- .../config/autoscaling_options.go | 21 ++ cluster-autoscaler/core/autoscaler.go | 2 +- cluster-autoscaler/core/scale_test_common.go | 2 +- cluster-autoscaler/core/scale_up.go | 2 +- .../estimator/binpacking_estimator.go | 75 ++++++-- .../estimator/binpacking_estimator_test.go | 181 ++++++++++++------ cluster-autoscaler/estimator/estimator.go | 25 ++- .../estimator/threshold_based_limiter.go | 64 +++++++ .../estimator/threshold_based_limiter_test.go | 129 +++++++++++++ cluster-autoscaler/main.go | 23 ++- 10 files changed, 442 insertions(+), 82 deletions(-) create mode 100644 cluster-autoscaler/estimator/threshold_based_limiter.go create mode 100644 cluster-autoscaler/estimator/threshold_based_limiter_test.go diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 0d60b6f2f356..9c0e1cb9d6bb 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -169,4 +169,25 @@ type AutoscalingOptions struct { DaemonSetEvictionForOccupiedNodes bool // User agent to use for HTTP calls. UserAgent string + // InitialNodeGroupBackoffDuration is the duration of first backoff after a new node failed to start + InitialNodeGroupBackoffDuration time.Duration + // MaxNodeGroupBackoffDuration is the maximum backoff duration for a NodeGroup after new nodes failed to start. + MaxNodeGroupBackoffDuration time.Duration + // NodeGroupBackoffResetTimeout is the time after last failed scale-up when the backoff duration is reset. + NodeGroupBackoffResetTimeout time.Duration + // MaxScaleDownParallelism is the maximum number of nodes (both empty and needing drain) that can be deleted in parallel. + MaxScaleDownParallelism int + // MaxDrainParallelism is the maximum number of nodes needing drain, that can be drained and deleted in parallel. + MaxDrainParallelism int + // GceExpanderEphemeralStorageSupport is whether scale-up takes ephemeral storage resources into account. + GceExpanderEphemeralStorageSupport bool + // RecordDuplicatedEvents controls whether events should be duplicated within a 5 minute window. + RecordDuplicatedEvents bool + // MaxNodesPerScaleUp controls how many nodes can be added in a single scale-up. + // Note that this is strictly a performance optimization aimed at limiting binpacking time, not a tool to rate-limit + // scale-up. There is nothing stopping CA from adding MaxNodesPerScaleUp every loop. + MaxNodesPerScaleUp int + // MaxNodeGroupBinpackingDuration is a maximum time that can be spent binpacking a single NodeGroup. If the threshold + // is exceeded binpacking will be cut short and a partial scale-up will be performed. + MaxNodeGroupBinpackingDuration time.Duration } diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index 62216018769d..33650fec2892 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -110,7 +110,7 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error { opts.ExpanderStrategy = expanderStrategy } if opts.EstimatorBuilder == nil { - estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName) + estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(opts.MaxNodesPerScaleUp, opts.MaxNodeGroupBinpackingDuration)) if err != nil { return err } diff --git a/cluster-autoscaler/core/scale_test_common.go b/cluster-autoscaler/core/scale_test_common.go index 57a4e48d5200..c070ff385231 100644 --- a/cluster-autoscaler/core/scale_test_common.go +++ b/cluster-autoscaler/core/scale_test_common.go @@ -165,7 +165,7 @@ func NewScaleTestAutoscalingContext( } // Ignoring error here is safe - if a test doesn't specify valid estimatorName, // it either doesn't need one, or should fail when it turns out to be nil. - estimatorBuilder, _ := estimator.NewEstimatorBuilder(options.EstimatorName) + estimatorBuilder, _ := estimator.NewEstimatorBuilder(options.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(0, 0)) predicateChecker, err := simulator.NewTestPredicateChecker() if err != nil { return context.AutoscalingContext{}, err diff --git a/cluster-autoscaler/core/scale_up.go b/cluster-autoscaler/core/scale_up.go index 5ed3cb3e99d3..74f8c9570904 100644 --- a/cluster-autoscaler/core/scale_up.go +++ b/cluster-autoscaler/core/scale_up.go @@ -312,7 +312,7 @@ func computeExpansionOption(context *context.AutoscalingContext, podEquivalenceG if len(option.Pods) > 0 { estimator := context.EstimatorBuilder(context.PredicateChecker, context.ClusterSnapshot) - option.NodeCount = estimator.Estimate(option.Pods, nodeInfo) + option.NodeCount, option.Pods = estimator.Estimate(option.Pods, nodeInfo, option.NodeGroup) } return option, nil diff --git a/cluster-autoscaler/estimator/binpacking_estimator.go b/cluster-autoscaler/estimator/binpacking_estimator.go index 87482f4921f1..5e6134a56d53 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator.go +++ b/cluster-autoscaler/estimator/binpacking_estimator.go @@ -22,6 +22,7 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" klog "k8s.io/klog/v2" @@ -38,15 +39,18 @@ type podInfo struct { type BinpackingNodeEstimator struct { predicateChecker simulator.PredicateChecker clusterSnapshot simulator.ClusterSnapshot + limiter EstimationLimiter } // NewBinpackingNodeEstimator builds a new BinpackingNodeEstimator. func NewBinpackingNodeEstimator( predicateChecker simulator.PredicateChecker, - clusterSnapshot simulator.ClusterSnapshot) *BinpackingNodeEstimator { + clusterSnapshot simulator.ClusterSnapshot, + limiter EstimationLimiter) *BinpackingNodeEstimator { return &BinpackingNodeEstimator{ predicateChecker: predicateChecker, clusterSnapshot: clusterSnapshot, + limiter: limiter, } } @@ -57,69 +61,102 @@ func NewBinpackingNodeEstimator( // still be maintained. // It is assumed that all pods from the given list can fit to nodeTemplate. // Returns the number of nodes needed to accommodate all pods from the list. -func (estimator *BinpackingNodeEstimator) Estimate( +func (e *BinpackingNodeEstimator) Estimate( pods []*apiv1.Pod, - nodeTemplate *schedulerframework.NodeInfo) int { + nodeTemplate *schedulerframework.NodeInfo, + nodeGroup cloudprovider.NodeGroup) (int, []*apiv1.Pod) { + + e.limiter.StartEstimation(pods, nodeGroup) + defer e.limiter.EndEstimation() + podInfos := calculatePodScore(pods, nodeTemplate) sort.Slice(podInfos, func(i, j int) bool { return podInfos[i].score > podInfos[j].score }) newNodeNames := make(map[string]bool) + newNodesWithPods := make(map[string]bool) - if err := estimator.clusterSnapshot.Fork(); err != nil { + if err := e.clusterSnapshot.Fork(); err != nil { klog.Errorf("Error while calling ClusterSnapshot.Fork; %v", err) - return 0 + return 0, nil } defer func() { - if err := estimator.clusterSnapshot.Revert(); err != nil { + if err := e.clusterSnapshot.Revert(); err != nil { klog.Fatalf("Error while calling ClusterSnapshot.Revert; %v", err) } }() newNodeNameIndex := 0 + scheduledPods := []*apiv1.Pod{} + lastNodeName := "" for _, podInfo := range podInfos { found := false - nodeName, err := estimator.predicateChecker.FitsAnyNodeMatching(estimator.clusterSnapshot, podInfo.pod, func(nodeInfo *schedulerframework.NodeInfo) bool { + nodeName, err := e.predicateChecker.FitsAnyNodeMatching(e.clusterSnapshot, podInfo.pod, func(nodeInfo *schedulerframework.NodeInfo) bool { return newNodeNames[nodeInfo.Node().Name] }) if err == nil { found = true - if err := estimator.clusterSnapshot.AddPod(podInfo.pod, nodeName); err != nil { + if err := e.clusterSnapshot.AddPod(podInfo.pod, nodeName); err != nil { klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", podInfo.pod.Namespace, podInfo.pod.Name, nodeName, err) - return 0 + return 0, nil } + scheduledPods = append(scheduledPods, podInfo.pod) + newNodesWithPods[nodeName] = true } if !found { + // Stop binpacking if we reach the limit of nodes we can add. + // We return the result of the binpacking that we already performed. + if !e.limiter.PermissionToAddNode() { + break + } + + // If the last node we've added is empty and the pod couldn't schedule on it, it wouldn't be able to schedule + // on a new node either. There is no point adding more nodes to snapshot in such case, especially because of + // performance cost each extra node adds to future FitsAnyNodeMatching calls. + if lastNodeName != "" && !newNodesWithPods[lastNodeName] { + continue + } + // Add new node - newNodeName, err := estimator.addNewNodeToSnapshot(nodeTemplate, newNodeNameIndex) + newNodeName, err := e.addNewNodeToSnapshot(nodeTemplate, newNodeNameIndex) if err != nil { klog.Errorf("Error while adding new node for template to ClusterSnapshot; %v", err) - return 0 + return 0, nil } newNodeNameIndex++ - // And schedule pod to it - if err := estimator.clusterSnapshot.AddPod(podInfo.pod, newNodeName); err != nil { + newNodeNames[newNodeName] = true + lastNodeName = newNodeName + + // And try to schedule pod to it. + // Note that this may still fail (ex. if topology spreading with zonal topologyKey is used); + // in this case we can't help the pending pod. We keep the node in clusterSnapshot to avoid + // adding and removing node to snapshot for each such pod. + if err := e.predicateChecker.CheckPredicates(e.clusterSnapshot, podInfo.pod, newNodeName); err != nil { + continue + } + if err := e.clusterSnapshot.AddPod(podInfo.pod, newNodeName); err != nil { klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", podInfo.pod.Namespace, podInfo.pod.Name, newNodeName, err) - return 0 + return 0, nil } - newNodeNames[newNodeName] = true + newNodesWithPods[newNodeName] = true + scheduledPods = append(scheduledPods, podInfo.pod) } } - return len(newNodeNames) + return len(newNodesWithPods), scheduledPods } -func (estimator *BinpackingNodeEstimator) addNewNodeToSnapshot( +func (e *BinpackingNodeEstimator) addNewNodeToSnapshot( template *schedulerframework.NodeInfo, nameIndex int) (string, error) { - newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("estimator-%d", nameIndex)) + newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("e-%d", nameIndex)) var pods []*apiv1.Pod for _, podInfo := range newNodeInfo.Pods { pods = append(pods, podInfo.Pod) } - if err := estimator.clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil { + if err := e.clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil { return "", err } return newNodeInfo.Node().Name, nil diff --git a/cluster-autoscaler/estimator/binpacking_estimator_test.go b/cluster-autoscaler/estimator/binpacking_estimator_test.go index 6aa4ee1b84ec..797d3d28e742 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator_test.go +++ b/cluster-autoscaler/estimator/binpacking_estimator_test.go @@ -22,6 +22,7 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" "k8s.io/autoscaler/cluster-autoscaler/utils/units" @@ -30,89 +31,157 @@ import ( "github.com/stretchr/testify/assert" ) -func makePod(cpuPerPod, memoryPerPod int64) *apiv1.Pod { - return &apiv1.Pod{ +func makePods(cpuPerPod int64, memoryPerPod int64, hostport int32, maxSkew int32, topologySpreadingKey string, podCount int) []*apiv1.Pod { + pod := &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "estimatee", + Namespace: "universe", + Labels: map[string]string{ + "app": "estimatee", + }, + }, Spec: apiv1.PodSpec{ Containers: []apiv1.Container{ { Resources: apiv1.ResourceRequirements{ Requests: apiv1.ResourceList{ apiv1.ResourceCPU: *resource.NewMilliQuantity(cpuPerPod, resource.DecimalSI), - apiv1.ResourceMemory: *resource.NewQuantity(memoryPerPod, resource.DecimalSI), + apiv1.ResourceMemory: *resource.NewQuantity(memoryPerPod*units.MiB, resource.DecimalSI), }, }, }, }, }, } -} - -func TestBinpackingEstimate(t *testing.T) { - estimator := newBinPackingEstimator(t) - - cpuPerPod := int64(350) - memoryPerPod := int64(1000 * units.MiB) - pod := makePod(cpuPerPod, memoryPerPod) - - pods := make([]*apiv1.Pod, 0) - for i := 0; i < 10; i++ { + if hostport > 0 { + pod.Spec.Containers[0].Ports = []apiv1.ContainerPort{ + { + HostPort: hostport, + }, + } + } + if maxSkew > 0 { + pod.Spec.TopologySpreadConstraints = []apiv1.TopologySpreadConstraint{ + { + MaxSkew: maxSkew, + TopologyKey: topologySpreadingKey, + WhenUnsatisfiable: "DoNotSchedule", + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "estimatee", + }, + }, + }, + } + } + pods := []*apiv1.Pod{} + for i := 0; i < podCount; i++ { pods = append(pods, pod) } + return pods +} + +func makeNode(cpu int64, mem int64, name string, zone string) *apiv1.Node { node := &apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "kubernetes.io/hostname": name, + "topology.kubernetes.io/zone": zone, + }, + }, Status: apiv1.NodeStatus{ Capacity: apiv1.ResourceList{ - apiv1.ResourceCPU: *resource.NewMilliQuantity(cpuPerPod*3-50, resource.DecimalSI), - apiv1.ResourceMemory: *resource.NewQuantity(2*memoryPerPod, resource.DecimalSI), + apiv1.ResourceCPU: *resource.NewMilliQuantity(cpu, resource.DecimalSI), + apiv1.ResourceMemory: *resource.NewQuantity(mem*units.MiB, resource.DecimalSI), apiv1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), }, }, } node.Status.Allocatable = node.Status.Capacity SetNodeReadyState(node, true, time.Time{}) - - nodeInfo := schedulerframework.NewNodeInfo() - nodeInfo.SetNode(node) - estimate := estimator.Estimate(pods, nodeInfo) - assert.Equal(t, 5, estimate) + return node } -func TestBinpackingEstimateWithPorts(t *testing.T) { - estimator := newBinPackingEstimator(t) - - cpuPerPod := int64(200) - memoryPerPod := int64(1000 * units.MiB) - pod := makePod(cpuPerPod, memoryPerPod) - pod.Spec.Containers[0].Ports = []apiv1.ContainerPort{ +func TestBinpackingEstimate(t *testing.T) { + testCases := []struct { + name string + millicores int64 + memory int64 + maxNodes int + pods []*apiv1.Pod + topologySpreadingKey string + expectNodeCount int + expectPodCount int + }{ { - HostPort: 5555, + name: "simple resource-based binpacking", + millicores: 350*3 - 50, + memory: 2 * 1000, + pods: makePods(350, 1000, 0, 0, "", 10), + expectNodeCount: 5, + expectPodCount: 10, }, - } - pods := make([]*apiv1.Pod, 0) - for i := 0; i < 8; i++ { - pods = append(pods, pod) - } - node := &apiv1.Node{ - Status: apiv1.NodeStatus{ - Capacity: apiv1.ResourceList{ - apiv1.ResourceCPU: *resource.NewMilliQuantity(5*cpuPerPod, resource.DecimalSI), - apiv1.ResourceMemory: *resource.NewQuantity(5*memoryPerPod, resource.DecimalSI), - apiv1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), - }, + { + name: "pods-per-node bound binpacking", + millicores: 10000, + memory: 20000, + pods: makePods(10, 100, 0, 0, "", 20), + expectNodeCount: 2, + expectPodCount: 20, + }, + { + name: "hostport conflict forces pod-per-node", + millicores: 1000, + memory: 5000, + pods: makePods(200, 1000, 5555, 0, "", 8), + expectNodeCount: 8, + expectPodCount: 8, + }, + { + name: "limiter cuts binpacking", + millicores: 1000, + memory: 5000, + pods: makePods(500, 1000, 0, 0, "", 20), + maxNodes: 5, + expectNodeCount: 5, + expectPodCount: 10, + }, + { + name: "hostname topology spreading with maxSkew=2 forces 2 pods/node", + millicores: 1000, + memory: 5000, + pods: makePods(20, 100, 0, 2, "kubernetes.io/hostname", 8), + expectNodeCount: 4, + expectPodCount: 8, + }, + { + name: "zonal topology spreading with maxSkew=2 only allows 2 pods to schedule", + millicores: 1000, + memory: 5000, + pods: makePods(20, 100, 0, 2, "topology.kubernetes.io/zone", 8), + expectNodeCount: 1, + expectPodCount: 2, }, } - node.Status.Allocatable = node.Status.Capacity - SetNodeReadyState(node, true, time.Time{}) - - nodeInfo := schedulerframework.NewNodeInfo() - nodeInfo.SetNode(node) - estimate := estimator.Estimate(pods, nodeInfo) - assert.Equal(t, 8, estimate) -} - -func newBinPackingEstimator(t *testing.T) *BinpackingNodeEstimator { - predicateChecker, err := simulator.NewTestPredicateChecker() - clusterSnapshot := simulator.NewBasicClusterSnapshot() - assert.NoError(t, err) - estimator := NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot) - return estimator + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + clusterSnapshot := simulator.NewBasicClusterSnapshot() + // Add one node in different zone to trigger topology spread constraints + clusterSnapshot.AddNode(makeNode(100, 100, "oldnode", "zone-jupiter")) + + predicateChecker, err := simulator.NewTestPredicateChecker() + assert.NoError(t, err) + limiter := NewThresholdBasedEstimationLimiter(tc.maxNodes, time.Duration(0)) + estimator := NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter) + + node := makeNode(tc.millicores, tc.memory, "template", "zone-mars") + nodeInfo := schedulerframework.NewNodeInfo() + nodeInfo.SetNode(node) + + estimatedNodes, estimatedPods := estimator.Estimate(tc.pods, nodeInfo, nil) + assert.Equal(t, tc.expectNodeCount, estimatedNodes) + assert.Equal(t, tc.expectPodCount, len(estimatedPods)) + }) + } } diff --git a/cluster-autoscaler/estimator/estimator.go b/cluster-autoscaler/estimator/estimator.go index 8a86ec9c66b5..7d4f819dcf19 100644 --- a/cluster-autoscaler/estimator/estimator.go +++ b/cluster-autoscaler/estimator/estimator.go @@ -20,6 +20,7 @@ import ( "fmt" apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/simulator" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -33,22 +34,40 @@ const ( var AvailableEstimators = []string{BinpackingEstimatorName} // Estimator calculates the number of nodes of given type needed to schedule pods. +// It returns the number of new nodes needed as well as the list of pods it managed +// to schedule on those nodes. type Estimator interface { - Estimate([]*apiv1.Pod, *schedulerframework.NodeInfo) int + Estimate([]*apiv1.Pod, *schedulerframework.NodeInfo, cloudprovider.NodeGroup) (int, []*apiv1.Pod) } // EstimatorBuilder creates a new estimator object. type EstimatorBuilder func(simulator.PredicateChecker, simulator.ClusterSnapshot) Estimator // NewEstimatorBuilder creates a new estimator object from flag. -func NewEstimatorBuilder(name string) (EstimatorBuilder, error) { +func NewEstimatorBuilder(name string, limiter EstimationLimiter) (EstimatorBuilder, error) { switch name { case BinpackingEstimatorName: return func( predicateChecker simulator.PredicateChecker, clusterSnapshot simulator.ClusterSnapshot) Estimator { - return NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot) + return NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter) }, nil } return nil, fmt.Errorf("unknown estimator: %s", name) } + +// EstimationLimiter controls how many nodes can be added by Estimator. +// A limiter can be used to prevent costly estimation if an actual ability to +// scale-up is limited by external factors. +type EstimationLimiter interface { + // StartEstimation is called at the start of estimation. + StartEstimation([]*apiv1.Pod, cloudprovider.NodeGroup) + // EndEstimation is called at the end of estimation. + EndEstimation() + // PermissionToAddNode is called by an estimator when it wants to add additional + // nodes to simulation. If permission is not granted the Estimator is expected + // not to add any more nodes in this simulation. + // There is no requirement for the Estimator to stop calculations, it's + // just not expected to add any more nodes. + PermissionToAddNode() bool +} diff --git a/cluster-autoscaler/estimator/threshold_based_limiter.go b/cluster-autoscaler/estimator/threshold_based_limiter.go new file mode 100644 index 000000000000..4381721d0c53 --- /dev/null +++ b/cluster-autoscaler/estimator/threshold_based_limiter.go @@ -0,0 +1,64 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package estimator + +import ( + "time" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + klog "k8s.io/klog/v2" +) + +type thresholdBasedEstimationLimiter struct { + maxDuration time.Duration + maxNodes int + nodes int + start time.Time +} + +func (tbel *thresholdBasedEstimationLimiter) StartEstimation([]*apiv1.Pod, cloudprovider.NodeGroup) { + tbel.start = time.Now() + tbel.nodes = 0 +} + +func (*thresholdBasedEstimationLimiter) EndEstimation() {} + +func (tbel *thresholdBasedEstimationLimiter) PermissionToAddNode() bool { + if tbel.maxNodes > 0 && tbel.nodes >= tbel.maxNodes { + klog.V(4).Infof("Capping binpacking after exceeding threshold of %i nodes", tbel.maxNodes) + return false + } + timeDefined := tbel.maxDuration > 0 && tbel.start != time.Time{} + if timeDefined && time.Now().After(tbel.start.Add(tbel.maxDuration)) { + klog.V(4).Infof("Capping binpacking after exceeding max duration of %v", tbel.maxDuration) + return false + } + tbel.nodes++ + return true +} + +// NewThresholdBasedEstimationLimiter returns an EstimationLimiter that will prevent estimation +// after either a node count- of time-based threshold is reached. This is meant to prevent cases +// where binpacking of hundreds or thousands of nodes takes extremely long time rendering CA +// incredibly slow or even completely crashing it. +func NewThresholdBasedEstimationLimiter(maxNodes int, maxDuration time.Duration) EstimationLimiter { + return &thresholdBasedEstimationLimiter{ + maxNodes: maxNodes, + maxDuration: maxDuration, + } +} diff --git a/cluster-autoscaler/estimator/threshold_based_limiter_test.go b/cluster-autoscaler/estimator/threshold_based_limiter_test.go new file mode 100644 index 000000000000..e80b586f3ebb --- /dev/null +++ b/cluster-autoscaler/estimator/threshold_based_limiter_test.go @@ -0,0 +1,129 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package estimator + +import ( + "testing" + "time" + + apiv1 "k8s.io/api/core/v1" + + "github.com/stretchr/testify/assert" +) + +type limiterOperation func(*testing.T, EstimationLimiter) + +func expectDeny(t *testing.T, l EstimationLimiter) { + assert.Equal(t, false, l.PermissionToAddNode()) +} + +func expectAllow(t *testing.T, l EstimationLimiter) { + assert.Equal(t, true, l.PermissionToAddNode()) +} + +func resetLimiter(t *testing.T, l EstimationLimiter) { + l.EndEstimation() + l.StartEstimation([]*apiv1.Pod{}, nil) +} + +func TestThresholdBasedLimiter(t *testing.T) { + testCases := []struct { + name string + maxNodes int + maxDuration time.Duration + startDelta time.Duration + operations []limiterOperation + expectNodeCount int + }{ + { + name: "no limiting happens", + maxNodes: 20, + operations: []limiterOperation{ + expectAllow, + expectAllow, + expectAllow, + }, + expectNodeCount: 3, + }, + { + name: "time based trigger fires", + maxNodes: 20, + maxDuration: 5 * time.Second, + startDelta: -10 * time.Second, + operations: []limiterOperation{ + expectDeny, + expectDeny, + }, + expectNodeCount: 0, + }, + { + name: "sequence of additions works until the threshold is hit", + maxNodes: 3, + operations: []limiterOperation{ + expectAllow, + expectAllow, + expectAllow, + expectDeny, + }, + expectNodeCount: 3, + }, + { + name: "node counter is reset", + maxNodes: 2, + operations: []limiterOperation{ + expectAllow, + expectAllow, + expectDeny, + resetLimiter, + expectAllow, + }, + expectNodeCount: 1, + }, + { + name: "timer is reset", + maxNodes: 20, + maxDuration: 5 * time.Second, + startDelta: -10 * time.Second, + operations: []limiterOperation{ + expectDeny, + resetLimiter, + expectAllow, + expectAllow, + }, + expectNodeCount: 2, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + limiter := &thresholdBasedEstimationLimiter{ + maxNodes: tc.maxNodes, + maxDuration: tc.maxDuration, + } + limiter.StartEstimation([]*apiv1.Pod{}, nil) + + if tc.startDelta != time.Duration(0) { + limiter.start = limiter.start.Add(tc.startDelta) + } + + for _, op := range tc.operations { + op(t, limiter) + } + assert.Equal(t, tc.expectNodeCount, limiter.nodes) + limiter.EndEstimation() + }) + } +} diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index fa16c2f7f2ff..cd1a101b9590 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -184,7 +184,19 @@ var ( daemonSetEvictionForOccupiedNodes = flag.Bool("daemonset-eviction-for-occupied-nodes", true, "DaemonSet pods will be gracefully terminated from non-empty nodes") userAgent = flag.String("user-agent", "cluster-autoscaler", "User agent used for HTTP calls.") - emitPerNodeGroupMetrics = flag.Bool("emit-per-nodegroup-metrics", false, "If true, emit per node group metrics.") + emitPerNodeGroupMetrics = flag.Bool("emit-per-nodegroup-metrics", false, "If true, emit per node group metrics.") + initialNodeGroupBackoffDuration = flag.Duration("initial-node-group-backoff-duration", 5*time.Minute, + "initialNodeGroupBackoffDuration is the duration of first backoff after a new node failed to start.") + maxNodeGroupBackoffDuration = flag.Duration("max-node-group-backoff-duration", 30*time.Minute, + "maxNodeGroupBackoffDuration is the maximum backoff duration for a NodeGroup after new nodes failed to start.") + nodeGroupBackoffResetTimeout = flag.Duration("node-group-backoff-reset-timeout", 3*time.Hour, + "nodeGroupBackoffResetTimeout is the time after last failed scale-up when the backoff duration is reset.") + maxScaleDownParallelismFlag = flag.Int("max-scale-down-parallelism", 10, "Maximum number of nodes (both empty and needing drain) that can be deleted in parallel.") + maxDrainParallelismFlag = flag.Int("max-drain-parallelism", 1, "Maximum number of nodes needing drain, that can be drained and deleted in parallel.") + gceExpanderEphemeralStorageSupport = flag.Bool("gce-expander-ephemeral-storage-support", false, "Whether scale-up takes ephemeral storage resources into account for GCE cloud provider") + recordDuplicatedEvents = flag.Bool("record-duplicated-events", false, "enable duplication of similar events within a 5 minute window.") + maxNodesPerScaleUp = flag.Int("max-nodes-per-scaleup", 1000, "Max nodes added in a single scale-up. This is intended strictly for optimizing CA algorithm latency and not a tool to rate-limit scale-up throughput.") + maxNodeGroupBinpackingDuration = flag.Duration("max-nodegroup-binpacking-duration", 10*time.Second, "Maximum time that will be spent in binpacking simulation for each NodeGroup.") ) func createAutoscalingOptions() config.AutoscalingOptions { @@ -264,6 +276,15 @@ func createAutoscalingOptions() config.AutoscalingOptions { DaemonSetEvictionForEmptyNodes: *daemonSetEvictionForEmptyNodes, DaemonSetEvictionForOccupiedNodes: *daemonSetEvictionForOccupiedNodes, UserAgent: *userAgent, + InitialNodeGroupBackoffDuration: *initialNodeGroupBackoffDuration, + MaxNodeGroupBackoffDuration: *maxNodeGroupBackoffDuration, + NodeGroupBackoffResetTimeout: *nodeGroupBackoffResetTimeout, + MaxScaleDownParallelism: *maxScaleDownParallelismFlag, + MaxDrainParallelism: *maxDrainParallelismFlag, + GceExpanderEphemeralStorageSupport: *gceExpanderEphemeralStorageSupport, + RecordDuplicatedEvents: *recordDuplicatedEvents, + MaxNodesPerScaleUp: *maxNodesPerScaleUp, + MaxNodeGroupBinpackingDuration: *maxNodeGroupBinpackingDuration, } } From 1bf63e6f63b2c3de607349fa5c2fa1815c85f31e Mon Sep 17 00:00:00 2001 From: Kubernetes Prow Robot Date: Wed, 21 Sep 2022 08:35:15 -0700 Subject: [PATCH 4/6] Merge pull request #5207 from jayantjain93/patch-2 Create GCE CloudProvider Owners file --- cluster-autoscaler/cloudprovider/gce/OWNERS | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 cluster-autoscaler/cloudprovider/gce/OWNERS diff --git a/cluster-autoscaler/cloudprovider/gce/OWNERS b/cluster-autoscaler/cloudprovider/gce/OWNERS new file mode 100644 index 000000000000..444bc04c026d --- /dev/null +++ b/cluster-autoscaler/cloudprovider/gce/OWNERS @@ -0,0 +1,14 @@ +approvers: +- jayantjain93 +- maciekpytel +- towca +- x13n +- yaroslava-serdiuk +- BigDarkClown +reviewers: +- jayantjain93 +- maciekpytel +- towca +- x13n +- yaroslava-serdiuk +- BigDarkClown From 81dbba034c98659675129d6e950ead0a3ce633c8 Mon Sep 17 00:00:00 2001 From: Kubernetes Prow Robot Date: Mon, 30 May 2022 10:16:52 -0700 Subject: [PATCH 5/6] Merge pull request #4926 from DataDog/deleteCreatedNodesWithErrors-no-segfault Don't deref nil nodegroup in deleteCreatedNodesWithErrors --- cluster-autoscaler/core/static_autoscaler.go | 14 ++++-- .../core/static_autoscaler_test.go | 50 +++++++++++++++++-- 2 files changed, 57 insertions(+), 7 deletions(-) diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 70ead85d1654..f48c67abcff8 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -351,7 +351,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError return nil } - if a.deleteCreatedNodesWithErrors() { + danglingNodes, err := a.deleteCreatedNodesWithErrors() + if err != nil { + klog.Warningf("Failed to remove nodes that were created with errors, skipping iteration: %v", err) + return nil + } + if danglingNodes { klog.V(0).Infof("Some nodes that failed to create were removed, skipping iteration") return nil } @@ -643,7 +648,7 @@ func removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNod return removedAny, nil } -func (a *StaticAutoscaler) deleteCreatedNodesWithErrors() bool { +func (a *StaticAutoscaler) deleteCreatedNodesWithErrors() (bool, error) { // We always schedule deleting of incoming errornous nodes // TODO[lukaszos] Consider adding logic to not retry delete every loop iteration nodes := a.clusterStateRegistry.GetCreatedNodesWithErrors() @@ -661,6 +666,9 @@ func (a *StaticAutoscaler) deleteCreatedNodesWithErrors() bool { klog.Warningf("Cannot determine nodeGroup for node %v; %v", id, err) continue } + if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { + return false, fmt.Errorf("node %s has no known nodegroup", node.GetName()) + } nodesToBeDeletedByNodeGroupId[nodeGroup.Id()] = append(nodesToBeDeletedByNodeGroupId[nodeGroup.Id()], node) } @@ -685,7 +693,7 @@ func (a *StaticAutoscaler) deleteCreatedNodesWithErrors() bool { a.clusterStateRegistry.InvalidateNodeInstancesCacheEntry(nodeGroup) } - return deletedAny + return deletedAny, nil } func (a *StaticAutoscaler) nodeGroupsById() map[string]cloudprovider.NodeGroup { diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index e51c433fd5b3..a68fbd2d5023 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -923,7 +923,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) } -func TestStaticAutoscalerInstaceCreationErrors(t *testing.T) { +func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) { // setup provider := &mockprovider.CloudProvider{} @@ -1058,7 +1058,9 @@ func TestStaticAutoscalerInstaceCreationErrors(t *testing.T) { clusterState.UpdateNodes([]*apiv1.Node{}, nil, now) // delete nodes with create errors - assert.True(t, autoscaler.deleteCreatedNodesWithErrors()) + removedNodes, err := autoscaler.deleteCreatedNodesWithErrors() + assert.True(t, removedNodes) + assert.NoError(t, err) // check delete was called on correct nodes nodeGroupA.AssertCalled(t, "DeleteNodes", mock.MatchedBy( @@ -1082,7 +1084,9 @@ func TestStaticAutoscalerInstaceCreationErrors(t *testing.T) { clusterState.UpdateNodes([]*apiv1.Node{}, nil, now) // delete nodes with create errors - assert.True(t, autoscaler.deleteCreatedNodesWithErrors()) + removedNodes, err = autoscaler.deleteCreatedNodesWithErrors() + assert.True(t, removedNodes) + assert.NoError(t, err) // nodes should be deleted again nodeGroupA.AssertCalled(t, "DeleteNodes", mock.MatchedBy( @@ -1145,10 +1149,48 @@ func TestStaticAutoscalerInstaceCreationErrors(t *testing.T) { clusterState.UpdateNodes([]*apiv1.Node{}, nil, now) // delete nodes with create errors - assert.False(t, autoscaler.deleteCreatedNodesWithErrors()) + removedNodes, err = autoscaler.deleteCreatedNodesWithErrors() + assert.False(t, removedNodes) + assert.NoError(t, err) // we expect no more Delete Nodes nodeGroupA.AssertNumberOfCalls(t, "DeleteNodes", 2) + + // failed node not included by NodeGroupForNode + nodeGroupC := &mockprovider.NodeGroup{} + nodeGroupC.On("Exist").Return(true) + nodeGroupC.On("Autoprovisioned").Return(false) + nodeGroupC.On("TargetSize").Return(1, nil) + nodeGroupC.On("Id").Return("C") + nodeGroupC.On("DeleteNodes", mock.Anything).Return(nil) + nodeGroupC.On("Nodes").Return([]cloudprovider.Instance{ + { + Id: "C1", + Status: &cloudprovider.InstanceStatus{ + State: cloudprovider.InstanceCreating, + ErrorInfo: &cloudprovider.InstanceErrorInfo{ + ErrorClass: cloudprovider.OutOfResourcesErrorClass, + ErrorCode: "QUOTA", + }, + }, + }, + }, nil) + provider = &mockprovider.CloudProvider{} + provider.On("NodeGroups").Return([]cloudprovider.NodeGroup{nodeGroupC}) + provider.On("NodeGroupForNode", mock.Anything).Return(nil, nil) + + clusterState = clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff()) + clusterState.RefreshCloudProviderNodeInstancesCache() + autoscaler.clusterStateRegistry = clusterState + + // update cluster state + clusterState.UpdateNodes([]*apiv1.Node{}, nil, time.Now()) + + // return early on failed nodes without matching nodegroups + removedNodes, err = autoscaler.deleteCreatedNodesWithErrors() + assert.False(t, removedNodes) + assert.Error(t, err) + nodeGroupC.AssertNumberOfCalls(t, "DeleteNodes", 0) } func TestStaticAutoscalerProcessorCallbacks(t *testing.T) { From 28b6e2e39504f9ef029bae426d987266086d006b Mon Sep 17 00:00:00 2001 From: Anton Kirillov Date: Tue, 1 Nov 2022 19:13:05 -0700 Subject: [PATCH 6/6] fix function name in tests --- cluster-autoscaler/core/static_autoscaler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index a68fbd2d5023..78cfbbce8693 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -1179,7 +1179,7 @@ func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) { provider.On("NodeGroups").Return([]cloudprovider.NodeGroup{nodeGroupC}) provider.On("NodeGroupForNode", mock.Anything).Return(nil, nil) - clusterState = clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff()) + clusterState = clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, newBackoff()) clusterState.RefreshCloudProviderNodeInstancesCache() autoscaler.clusterStateRegistry = clusterState