From a0ae713fbadf91cb668b3db7526cd2944dc44c91 Mon Sep 17 00:00:00 2001 From: David Morrison Date: Fri, 12 Nov 2021 12:44:45 -0800 Subject: [PATCH 1/7] convert registeredAsgs to a map --- .../cloudprovider/aws/auto_scaling_groups.go | 65 ++++++++----------- .../cloudprovider/aws/aws_cloud_provider.go | 13 ++-- .../aws/aws_cloud_provider_test.go | 4 +- .../cloudprovider/aws/aws_manager.go | 2 +- .../cloudprovider/aws/aws_manager_test.go | 6 +- .../cloudprovider/aws/instance_type_cache.go | 2 +- .../aws/instance_type_cache_test.go | 7 +- 7 files changed, 48 insertions(+), 51 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index 1f85b0906780..b262ddd84d73 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -36,7 +36,7 @@ const ( ) type asgCache struct { - registeredAsgs []*asg + registeredAsgs map[AwsRef]*asg asgToInstances map[AwsRef][]AwsInstanceRef instanceToAsg map[AwsInstanceRef]*asg asgInstanceTypeCache *instanceTypeExpirationStore @@ -75,7 +75,7 @@ type asg struct { func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySpecs []asgAutoDiscoveryConfig) (*asgCache, error) { registry := &asgCache{ - registeredAsgs: make([]*asg, 0), + registeredAsgs: make(map[AwsRef]*asg, 0), awsService: awsService, asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), @@ -121,53 +121,44 @@ func (m *asgCache) parseExplicitAsgs(specs []string) error { // Register ASG. Returns the registered ASG. func (m *asgCache) register(asg *asg) *asg { - for i := range m.registeredAsgs { - if existing := m.registeredAsgs[i]; existing.AwsRef == asg.AwsRef { - if reflect.DeepEqual(existing, asg) { - return existing - } + if existing, asgExists := m.registeredAsgs[asg.AwsRef]; asgExists { + if reflect.DeepEqual(existing, asg) { + return existing + } - klog.V(4).Infof("Updating ASG %s", asg.AwsRef.Name) + klog.V(4).Infof("Updating ASG %s", asg.AwsRef.Name) - // Explicit registered groups should always use the manually provided min/max - // values and the not the ones returned by the API - if !m.explicitlyConfigured[asg.AwsRef] { - existing.minSize = asg.minSize - existing.maxSize = asg.maxSize - } + // Explicit registered groups should always use the manually provided min/max + // values and the not the ones returned by the API + if !m.explicitlyConfigured[asg.AwsRef] { + existing.minSize = asg.minSize + existing.maxSize = asg.maxSize + } - existing.curSize = asg.curSize + existing.curSize = asg.curSize - // Those information are mainly required to create templates when scaling - // from zero - existing.AvailabilityZones = asg.AvailabilityZones - existing.LaunchConfigurationName = asg.LaunchConfigurationName - existing.LaunchTemplate = asg.LaunchTemplate - existing.MixedInstancesPolicy = asg.MixedInstancesPolicy - existing.Tags = asg.Tags + // Those information are mainly required to create templates when scaling + // from zero + existing.AvailabilityZones = asg.AvailabilityZones + existing.LaunchConfigurationName = asg.LaunchConfigurationName + existing.LaunchTemplate = asg.LaunchTemplate + existing.MixedInstancesPolicy = asg.MixedInstancesPolicy + existing.Tags = asg.Tags - return existing - } + return existing } klog.V(1).Infof("Registering ASG %s", asg.AwsRef.Name) - m.registeredAsgs = append(m.registeredAsgs, asg) + m.registeredAsgs[asg.AwsRef] = asg return asg } // Unregister ASG. Returns the unregistered ASG. func (m *asgCache) unregister(a *asg) *asg { - updated := make([]*asg, 0, len(m.registeredAsgs)) - var changed *asg - for _, existing := range m.registeredAsgs { - if existing.AwsRef == a.AwsRef { - klog.V(1).Infof("Unregistered ASG %s", a.AwsRef.Name) - changed = a - continue - } - updated = append(updated, existing) + if _, asgExists := m.registeredAsgs[a.AwsRef]; asgExists { + klog.V(1).Infof("Unregistered ASG %s", a.AwsRef.Name) + delete(m.registeredAsgs, a.AwsRef) } - m.registeredAsgs = updated - return changed + return a } func (m *asgCache) buildAsgFromSpec(spec string) (*asg, error) { @@ -184,7 +175,7 @@ func (m *asgCache) buildAsgFromSpec(spec string) (*asg, error) { } // Get returns the currently registered ASGs -func (m *asgCache) Get() []*asg { +func (m *asgCache) Get() map[AwsRef]*asg { m.mutex.Lock() defer m.mutex.Unlock() diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go index 8357966b59cf..6e951b6ca88b 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go @@ -86,12 +86,12 @@ func (aws *awsCloudProvider) GetAvailableGPUTypes() map[string]struct{} { // NodeGroups returns all node groups configured for this cloud provider. func (aws *awsCloudProvider) NodeGroups() []cloudprovider.NodeGroup { asgs := aws.awsManager.getAsgs() - ngs := make([]cloudprovider.NodeGroup, len(asgs)) - for i, asg := range asgs { - ngs[i] = &AwsNodeGroup{ + ngs := make([]cloudprovider.NodeGroup, 0, len(asgs)) + for _, asg := range asgs { + ngs = append(ngs, &AwsNodeGroup{ asg: asg, awsManager: aws.awsManager, - } + }) } return ngs @@ -320,7 +320,10 @@ func (ng *AwsNodeGroup) Nodes() ([]cloudprovider.Instance, error) { instances := make([]cloudprovider.Instance, len(asgNodes)) for i, asgNode := range asgNodes { - instances[i] = cloudprovider.Instance{Id: asgNode.ProviderID} + instances[i] = cloudprovider.Instance{ + Id: asgNode.ProviderID, + Status: nil, + } } return instances, nil } diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index 7c61340ec0e8..7eeb82ae5758 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -29,7 +29,7 @@ import ( var testAwsManager = &AwsManager{ asgCache: &asgCache{ - registeredAsgs: make([]*asg, 0), + registeredAsgs: make(map[AwsRef]*asg, 0), asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), interrupt: make(chan struct{}), @@ -43,7 +43,7 @@ func newTestAwsManagerWithMockServices(mockAutoScaling autoScalingI, mockEC2 ec2 return &AwsManager{ awsService: awsService, asgCache: &asgCache{ - registeredAsgs: make([]*asg, 0), + registeredAsgs: make(map[AwsRef]*asg, 0), asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), asgInstanceTypeCache: newAsgInstanceTypeCache(&awsService), diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager.go b/cluster-autoscaler/cloudprovider/aws/aws_manager.go index 8648a37a1819..e21d249c0c8d 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager.go @@ -273,7 +273,7 @@ func (m *AwsManager) Cleanup() { m.asgCache.Cleanup() } -func (m *AwsManager) getAsgs() []*asg { +func (m *AwsManager) getAsgs() map[AwsRef]*asg { return m.asgCache.Get() } diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go index d18efa2439e6..2392ed98f264 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go @@ -358,6 +358,7 @@ func makeTaintSet(taints []apiv1.Taint) map[apiv1.Taint]bool { func TestFetchExplicitAsgs(t *testing.T) { min, max, groupname := 1, 10, "coolasg" + asgRef := AwsRef{Name: groupname} a := &autoScalingMock{} a.On("DescribeAutoScalingGroups", &autoscaling.DescribeAutoScalingGroupsInput{ @@ -408,7 +409,7 @@ func TestFetchExplicitAsgs(t *testing.T) { asgs := m.asgCache.Get() assert.Equal(t, 1, len(asgs)) - validateAsg(t, asgs[0], groupname, min, max) + validateAsg(t, asgs[asgRef], groupname, min, max) } func TestGetASGTemplate(t *testing.T) { @@ -504,6 +505,7 @@ func TestGetASGTemplate(t *testing.T) { func TestFetchAutoAsgs(t *testing.T) { min, max := 1, 10 groupname, tags := "coolasg", []string{"tag", "anothertag"} + asgRef := AwsRef{Name: groupname} a := &autoScalingMock{} // Lookup groups associated with tags @@ -561,7 +563,7 @@ func TestFetchAutoAsgs(t *testing.T) { asgs := m.asgCache.Get() assert.Equal(t, 1, len(asgs)) - validateAsg(t, asgs[0], groupname, min, max) + validateAsg(t, asgs[asgRef], groupname, min, max) // Simulate the previously discovered ASG disappearing a.On("DescribeTagsPages", mock.MatchedBy(tagsMatcher(expectedTagsInput)), diff --git a/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go b/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go index 2b2ce74c8044..90c7f2fd2c4d 100644 --- a/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go +++ b/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go @@ -84,7 +84,7 @@ func (c *jitterClock) Since(ts time.Time) time.Duration { return since } -func (es instanceTypeExpirationStore) populate(autoscalingGroups []*asg) error { +func (es instanceTypeExpirationStore) populate(autoscalingGroups map[AwsRef]*asg) error { asgsToQuery := []*asg{} if c, ok := es.jitterClock.(*jitterClock); ok { diff --git a/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go b/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go index 94d0018e20cc..88cc75f087a8 100644 --- a/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go +++ b/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go @@ -79,9 +79,10 @@ func TestLTVersionChange(t *testing.T) { m := newAsgInstanceTypeCacheWithClock(&awsWrapper{a, e, nil}, fakeClock, fakeStore) for i := 0; i < 2; i++ { - err := m.populate([]*asg{ - { - AwsRef: AwsRef{Name: asgName}, + asgRef := AwsRef{Name: asgName} + err := m.populate(map[AwsRef]*asg{ + asgRef: { + AwsRef: asgRef, LaunchTemplate: &launchTemplate{ name: ltName, version: aws.StringValue(ltVersions[i]), From 8ac87b3f34a43e3af9f7011531870ef5dc724bed Mon Sep 17 00:00:00 2001 From: David Morrison Date: Fri, 12 Nov 2021 15:00:01 -0800 Subject: [PATCH 2/7] early abort if we get a failed scaling activity event from AWS --- .../cloudprovider/aws/auto_scaling_groups.go | 68 +++++++++++++++++-- .../cloudprovider/aws/aws_cloud_provider.go | 16 ++++- .../aws/aws_cloud_provider_test.go | 7 ++ .../cloudprovider/aws/aws_manager.go | 4 ++ .../cloudprovider/aws/aws_manager_test.go | 14 ++++ .../cloudprovider/aws/aws_wrapper.go | 1 + .../cloudprovider/aws/aws_wrapper_test.go | 5 ++ 7 files changed, 109 insertions(+), 6 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index b262ddd84d73..730874f2f17b 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -31,14 +31,16 @@ import ( ) const ( - scaleToZeroSupported = true - placeholderInstanceNamePrefix = "i-placeholder" + scaleToZeroSupported = true + placeholderInstanceNamePrefix = "i-placeholder" + placeholderUnfulfillableStatus = "placeholder-cannot-be-fullfilled" ) type asgCache struct { registeredAsgs map[AwsRef]*asg asgToInstances map[AwsRef][]AwsInstanceRef instanceToAsg map[AwsInstanceRef]*asg + instanceStatus map[AwsInstanceRef]*string asgInstanceTypeCache *instanceTypeExpirationStore mutex sync.Mutex awsService *awsWrapper @@ -62,9 +64,10 @@ type mixedInstancesPolicy struct { type asg struct { AwsRef - minSize int - maxSize int - curSize int + minSize int + maxSize int + curSize int + lastUpdateTime *time.Time AvailabilityZones []string LaunchConfigurationName string @@ -79,6 +82,7 @@ func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySp awsService: awsService, asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), + instanceStatus: make(map[AwsInstanceRef]*string), asgInstanceTypeCache: newAsgInstanceTypeCache(awsService), interrupt: make(chan struct{}), asgAutoDiscoverySpecs: autoDiscoverySpecs, @@ -217,6 +221,17 @@ func (m *asgCache) InstancesByAsg(ref AwsRef) ([]AwsInstanceRef, error) { return nil, fmt.Errorf("error while looking for instances of ASG: %s", ref) } +func (m *asgCache) InstanceStatus(ref AwsInstanceRef) (*string, error) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if status, found := m.instanceStatus[ref]; found { + return status, nil + } + + return nil, fmt.Errorf("could not find instance %v", ref) +} + func (m *asgCache) SetAsgSize(asg *asg, size int) error { m.mutex.Lock() defer m.mutex.Unlock() @@ -239,6 +254,7 @@ func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error { } // Proactively set the ASG size so autoscaler makes better decisions + asg.lastUpdateTime = &start asg.curSize = size return nil @@ -358,6 +374,7 @@ func (m *asgCache) regenerate() error { newInstanceToAsgCache := make(map[AwsInstanceRef]*asg) newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef) + newInstanceStatusMap := make(map[AwsInstanceRef]*string) // Build list of known ASG names refreshNames, err := m.buildAsgNames() @@ -394,6 +411,7 @@ func (m *asgCache) regenerate() error { ref := m.buildInstanceRefFromAWS(instance) newInstanceToAsgCache[ref] = asg newAsgToInstancesCache[asg.AwsRef][i] = ref + newInstanceStatusMap[ref] = instance.HealthStatus } } @@ -422,6 +440,7 @@ func (m *asgCache) regenerate() error { m.asgToInstances = newAsgToInstancesCache m.instanceToAsg = newInstanceToAsgCache m.autoscalingOptions = newAutoscalingOptions + m.instanceStatus = newInstanceStatusMap return nil } @@ -435,17 +454,56 @@ func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*aut klog.V(4).Infof("Instance group %s has only %d instances created while requested count is %d. "+ "Creating placeholder instances.", *g.AutoScalingGroupName, realInstances, desired) + + healthStatus := "" + isAvailable, err := m.isNodeGroupAvailable(g) + if err != nil { + klog.V(4).Infof("Could not check instance availability, creating placeholder node anyways: %v", err) + } else if !isAvailable { + klog.Warningf("Instance group %s cannot provision any more nodes!", *g.AutoScalingGroupName) + healthStatus = placeholderUnfulfillableStatus + } + for i := realInstances; i < desired; i++ { id := fmt.Sprintf("%s-%s-%d", placeholderInstanceNamePrefix, *g.AutoScalingGroupName, i) g.Instances = append(g.Instances, &autoscaling.Instance{ InstanceId: &id, AvailabilityZone: g.AvailabilityZones[0], + HealthStatus: &healthStatus, }) } } return groups } +func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error) { + input := &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: group.AutoScalingGroupName, + MaxRecords: aws.Int64(1), // We only care about the most recent event + } + + start := time.Now() + response, err := m.awsService.DescribeScalingActivities(input) + observeAWSRequest("DescribeScalingActivities", err, start) + if err != nil { + return true, err // If we can't describe the scaling activities we assume the node group is available + } + + if len(response.Activities) > 0 { + activity := response.Activities[0] + asgRef := AwsRef{Name: *group.AutoScalingGroupName} + if a, ok := m.registeredAsgs[asgRef]; ok { + lut := a.lastUpdateTime + if lut != nil && activity.StartTime.After(*lut) && *activity.StatusCode == "Failed" { + return false, nil + } + } else { + klog.V(4).Infof("asg %v is not registered yet, skipping DescribeScalingActivities check", asgRef.Name) + } + } + return true, nil +} + func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) { spec := dynamic.NodeGroupSpec{ Name: aws.StringValue(g.AutoScalingGroupName), diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go index 6e951b6ca88b..4bf11960209d 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go @@ -320,9 +320,23 @@ func (ng *AwsNodeGroup) Nodes() ([]cloudprovider.Instance, error) { instances := make([]cloudprovider.Instance, len(asgNodes)) for i, asgNode := range asgNodes { + var status *cloudprovider.InstanceStatus + instanceStatusString, err := ng.awsManager.GetInstanceStatus(asgNode) + if err != nil { + klog.V(4).Infof("Could not get instance status, continuing anyways: %v", err) + } else if instanceStatusString != nil && *instanceStatusString == placeholderUnfulfillableStatus { + status = &cloudprovider.InstanceStatus{ + State: cloudprovider.InstanceCreating, + ErrorInfo: &cloudprovider.InstanceErrorInfo{ + ErrorClass: cloudprovider.OutOfResourcesErrorClass, + ErrorCode: placeholderUnfulfillableStatus, + ErrorMessage: "AWS cannot provision any more instances for this node group", + }, + } + } instances[i] = cloudprovider.Instance{ Id: asgNode.ProviderID, - Status: nil, + Status: status, } } return instances, nil diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index 7eeb82ae5758..31635966f760 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -463,6 +463,13 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { expectedInstancesCount = 1 }).Return(nil) + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + MaxRecords: aws.Int64(1), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}) + provider.Refresh() initialSize, err := asgs[0].TargetSize() diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager.go b/cluster-autoscaler/cloudprovider/aws/aws_manager.go index e21d249c0c8d..0f246db3bb6f 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager.go @@ -301,6 +301,10 @@ func (m *AwsManager) GetAsgNodes(ref AwsRef) ([]AwsInstanceRef, error) { return m.asgCache.InstancesByAsg(ref) } +func (m *AwsManager) GetInstanceStatus(ref AwsInstanceRef) (*string, error) { + return m.asgCache.InstanceStatus(ref) +} + func (m *AwsManager) getAsgTemplate(asg *asg) (*asgTemplate, error) { if len(asg.AvailabilityZones) < 1 { return nil, fmt.Errorf("unable to get first AvailabilityZone for ASG %q", asg.Name) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go index 2392ed98f264..9d4a9ce78567 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go @@ -391,6 +391,13 @@ func TestFetchExplicitAsgs(t *testing.T) { }}, false) }).Return(nil) + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("coolasg"), + MaxRecords: aws.Int64(1), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}) + do := cloudprovider.NodeGroupDiscoveryOptions{ // Register the same node group twice with different max nodes. // The intention is to test that the asgs.Register method will update @@ -549,6 +556,13 @@ func TestFetchAutoAsgs(t *testing.T) { }}}, false) }).Return(nil).Twice() + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("coolasg"), + MaxRecords: aws.Int64(1), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}) + do := cloudprovider.NodeGroupDiscoveryOptions{ NodeGroupAutoDiscoverySpecs: []string{fmt.Sprintf("asg:tag=%s", strings.Join(tags, ","))}, } diff --git a/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go b/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go index 86453300b757..1c7ef4eaa1ce 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go @@ -33,6 +33,7 @@ import ( type autoScalingI interface { DescribeAutoScalingGroupsPages(input *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error DescribeLaunchConfigurations(*autoscaling.DescribeLaunchConfigurationsInput) (*autoscaling.DescribeLaunchConfigurationsOutput, error) + DescribeScalingActivities(*autoscaling.DescribeScalingActivitiesInput) (*autoscaling.DescribeScalingActivitiesOutput, error) DescribeTagsPages(input *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error SetDesiredCapacity(input *autoscaling.SetDesiredCapacityInput) (*autoscaling.SetDesiredCapacityOutput, error) TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go b/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go index 4fc2778db03e..f83a7bf5b7d6 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go @@ -45,6 +45,11 @@ func (a *autoScalingMock) DescribeLaunchConfigurations(i *autoscaling.DescribeLa return args.Get(0).(*autoscaling.DescribeLaunchConfigurationsOutput), nil } +func (a *autoScalingMock) DescribeScalingActivities(i *autoscaling.DescribeScalingActivitiesInput) (*autoscaling.DescribeScalingActivitiesOutput, error) { + args := a.Called(i) + return args.Get(0).(*autoscaling.DescribeScalingActivitiesOutput), nil +} + func (a *autoScalingMock) DescribeTagsPages(i *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error { args := a.Called(i, fn) return args.Error(0) From 933065db73bf6210bab6d9e99ec26e598da07290 Mon Sep 17 00:00:00 2001 From: David Morrison Date: Wed, 17 Nov 2021 11:03:23 -0800 Subject: [PATCH 3/7] adding tests --- .../aws/auto_scaling_groups_test.go | 121 ++++++++++++++++++ .../aws/aws_cloud_provider_test.go | 2 +- .../cloudprovider/aws/aws_manager_test.go | 4 +- .../cloudprovider/aws/aws_wrapper_test.go | 2 +- 4 files changed, 125 insertions(+), 4 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go index e9f9ad1ab79e..3deff9cc5700 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go @@ -17,8 +17,12 @@ limitations under the License. package aws import ( + "errors" "testing" + "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/autoscaling" "github.com/stretchr/testify/assert" ) @@ -46,3 +50,120 @@ func validateAsg(t *testing.T, asg *asg, name string, minSize int, maxSize int) assert.Equal(t, minSize, asg.minSize) assert.Equal(t, maxSize, asg.maxSize) } + +func TestCreatePlaceholders(t *testing.T) { + registeredAsgName := aws.String("test-asg") + registeredAsgRef := AwsRef{Name: *registeredAsgName} + + cases := []struct { + name string + desiredCapacity *int64 + activities []*autoscaling.Activity + groupLastUpdateTime *time.Time + describeErr error + asgToCheck *string + }{ + { + name: "add placeholders successful", + desiredCapacity: aws.Int64(10), + }, + { + name: "no placeholders needed", + desiredCapacity: aws.Int64(0), + }, + { + name: "DescribeScalingActivities failed", + desiredCapacity: aws.Int64(1), + describeErr: errors.New("timeout"), + }, + { + name: "early abort if AWS scaling up fails", + desiredCapacity: aws.Int64(1), + activities: []*autoscaling.Activity{ + { + StatusCode: aws.String("Failed"), + StartTime: aws.Time(time.Unix(10, 0)), + }, + }, + groupLastUpdateTime: aws.Time(time.Unix(9, 0)), + }, + { + name: "AWS scaling failed event before CA scale_up", + desiredCapacity: aws.Int64(1), + activities: []*autoscaling.Activity{ + { + StatusCode: aws.String("Failed"), + StartTime: aws.Time(time.Unix(9, 0)), + }, + }, + groupLastUpdateTime: aws.Time(time.Unix(10, 0)), + }, + { + name: "asg not registered", + desiredCapacity: aws.Int64(10), + activities: []*autoscaling.Activity{ + { + StatusCode: aws.String("Failed"), + StartTime: aws.Time(time.Unix(10, 0)), + }, + }, + groupLastUpdateTime: aws.Time(time.Unix(9, 0)), + asgToCheck: aws.String("unregisteredAsgName"), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + shouldCallDescribeScalingActivities := true + if *tc.desiredCapacity == int64(0) { + shouldCallDescribeScalingActivities = false + } + + asgName := registeredAsgName + if tc.asgToCheck != nil { + asgName = tc.asgToCheck + } + + a := &autoScalingMock{} + if shouldCallDescribeScalingActivities { + a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: asgName, + MaxRecords: aws.Int64(1), + }).Return( + &autoscaling.DescribeScalingActivitiesOutput{Activities: tc.activities}, + tc.describeErr, + ).Once() + } + + asgCache := &asgCache{ + awsService: &awsWrapper{ + autoScalingI: a, + ec2I: nil, + }, + registeredAsgs: map[AwsRef]*asg{ + registeredAsgRef: &asg{ + AwsRef: registeredAsgRef, + lastUpdateTime: tc.groupLastUpdateTime, + }, + }, + } + + groups := []*autoscaling.Group{ + { + AutoScalingGroupName: asgName, + AvailabilityZones: []*string{aws.String("westeros-1a")}, + DesiredCapacity: tc.desiredCapacity, + Instances: []*autoscaling.Instance{}, + }, + } + asgCache.createPlaceholdersForDesiredNonStartedInstances(groups) + assert.Equal(t, int64(len(groups[0].Instances)), *tc.desiredCapacity) + if tc.activities != nil && *tc.activities[0].StatusCode == "Failed" && tc.activities[0].StartTime.After(*tc.groupLastUpdateTime) && asgName == registeredAsgName { + assert.Equal(t, *groups[0].Instances[0].HealthStatus, placeholderUnfulfillableStatus) + } else if len(groups[0].Instances) > 0 { + assert.Equal(t, *groups[0].Instances[0].HealthStatus, "") + } + a.AssertExpectations(t) + }) + } +} diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index 31635966f760..2753daabd2dc 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -468,7 +468,7 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { AutoScalingGroupName: aws.String("test-asg"), MaxRecords: aws.Int64(1), }, - ).Return(&autoscaling.DescribeScalingActivitiesOutput{}) + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) provider.Refresh() diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go index 9d4a9ce78567..79d8a2d619a3 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go @@ -396,7 +396,7 @@ func TestFetchExplicitAsgs(t *testing.T) { AutoScalingGroupName: aws.String("coolasg"), MaxRecords: aws.Int64(1), }, - ).Return(&autoscaling.DescribeScalingActivitiesOutput{}) + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) do := cloudprovider.NodeGroupDiscoveryOptions{ // Register the same node group twice with different max nodes. @@ -561,7 +561,7 @@ func TestFetchAutoAsgs(t *testing.T) { AutoScalingGroupName: aws.String("coolasg"), MaxRecords: aws.Int64(1), }, - ).Return(&autoscaling.DescribeScalingActivitiesOutput{}) + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) do := cloudprovider.NodeGroupDiscoveryOptions{ NodeGroupAutoDiscoverySpecs: []string{fmt.Sprintf("asg:tag=%s", strings.Join(tags, ","))}, diff --git a/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go b/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go index f83a7bf5b7d6..e360f341c8b6 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go @@ -47,7 +47,7 @@ func (a *autoScalingMock) DescribeLaunchConfigurations(i *autoscaling.DescribeLa func (a *autoScalingMock) DescribeScalingActivities(i *autoscaling.DescribeScalingActivitiesInput) (*autoscaling.DescribeScalingActivitiesOutput, error) { args := a.Called(i) - return args.Get(0).(*autoscaling.DescribeScalingActivitiesOutput), nil + return args.Get(0).(*autoscaling.DescribeScalingActivitiesOutput), args.Error(1) } func (a *autoScalingMock) DescribeTagsPages(i *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error { From aa381a89588f8e780281fd5247df26301b9dcc82 Mon Sep 17 00:00:00 2001 From: David Morrison Date: Mon, 22 Nov 2021 15:22:30 -0800 Subject: [PATCH 4/7] update the readmes with appropriate IAM permissions --- cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md | 1 + cluster-autoscaler/cloudprovider/aws/README.md | 1 + 2 files changed, 2 insertions(+) diff --git a/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md b/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md index cf251787c234..f638a9808d6b 100644 --- a/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md +++ b/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md @@ -67,6 +67,7 @@ __NOTE:__ Please see [the README](README.md#IAM-Policy) for more information on "autoscaling:DescribeAutoScalingGroups", "autoscaling:DescribeAutoScalingInstances", "autoscaling:DescribeLaunchConfigurations", + "autoscaling:DescribeScalingActivities", "autoscaling:DescribeTags", "ec2:DescribeInstanceTypes", "ec2:DescribeLaunchTemplateVersions" diff --git a/cluster-autoscaler/cloudprovider/aws/README.md b/cluster-autoscaler/cloudprovider/aws/README.md index c4064ad7e931..956f797ade9b 100644 --- a/cluster-autoscaler/cloudprovider/aws/README.md +++ b/cluster-autoscaler/cloudprovider/aws/README.md @@ -93,6 +93,7 @@ binary, replacing min and max node counts and the ASG: "autoscaling:DescribeAutoScalingGroups", "autoscaling:DescribeAutoScalingInstances", "autoscaling:DescribeLaunchConfigurations", + "autoscaling:DescribeScalingActivities", "autoscaling:SetDesiredCapacity", "autoscaling:TerminateInstanceInAutoScalingGroup" ], From aebd984e43b3c7bddeb1c35ae4994b236250225e Mon Sep 17 00:00:00 2001 From: David Morrison Date: Tue, 30 Nov 2021 10:53:45 -0800 Subject: [PATCH 5/7] fix linting errors --- cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go | 2 +- .../cloudprovider/aws/auto_scaling_groups_test.go | 2 +- cluster-autoscaler/cloudprovider/aws/aws_manager.go | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index 730874f2f17b..ac344c3dbc2b 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -33,7 +33,7 @@ import ( const ( scaleToZeroSupported = true placeholderInstanceNamePrefix = "i-placeholder" - placeholderUnfulfillableStatus = "placeholder-cannot-be-fullfilled" + placeholderUnfulfillableStatus = "placeholder-cannot-be-fulfilled" ) type asgCache struct { diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go index 3deff9cc5700..d28519f29c0c 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go @@ -141,7 +141,7 @@ func TestCreatePlaceholders(t *testing.T) { ec2I: nil, }, registeredAsgs: map[AwsRef]*asg{ - registeredAsgRef: &asg{ + registeredAsgRef: { AwsRef: registeredAsgRef, lastUpdateTime: tc.groupLastUpdateTime, }, diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager.go b/cluster-autoscaler/cloudprovider/aws/aws_manager.go index 0f246db3bb6f..17d29e8ff707 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager.go @@ -301,6 +301,7 @@ func (m *AwsManager) GetAsgNodes(ref AwsRef) ([]AwsInstanceRef, error) { return m.asgCache.InstancesByAsg(ref) } +// GetInstanceStatus returns the status of ASG nodes func (m *AwsManager) GetInstanceStatus(ref AwsInstanceRef) (*string, error) { return m.asgCache.InstanceStatus(ref) } From 8d608ac03e93df6fc7f6359115da0cb224fc586a Mon Sep 17 00:00:00 2001 From: David Morrison Date: Wed, 2 Mar 2022 13:02:24 -0800 Subject: [PATCH 6/7] check all scaling activities instead of just the last one --- .../cloudprovider/aws/auto_scaling_groups.go | 11 ++++++----- .../cloudprovider/aws/auto_scaling_groups_test.go | 10 +++++----- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index ac344c3dbc2b..d39f4685f63b 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -67,7 +67,7 @@ type asg struct { minSize int maxSize int curSize int - lastUpdateTime *time.Time + lastUpdateTime time.Time AvailabilityZones []string LaunchConfigurationName string @@ -254,7 +254,7 @@ func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error { } // Proactively set the ASG size so autoscaler makes better decisions - asg.lastUpdateTime = &start + asg.lastUpdateTime = start asg.curSize = size return nil @@ -489,12 +489,13 @@ func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error) return true, err // If we can't describe the scaling activities we assume the node group is available } - if len(response.Activities) > 0 { - activity := response.Activities[0] + for _, activity := range response.Activities { asgRef := AwsRef{Name: *group.AutoScalingGroupName} if a, ok := m.registeredAsgs[asgRef]; ok { lut := a.lastUpdateTime - if lut != nil && activity.StartTime.After(*lut) && *activity.StatusCode == "Failed" { + if activity.StartTime.Before(lut) { + break + } else if *activity.StatusCode == "Failed" { return false, nil } } else { diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go index d28519f29c0c..f619af1e2da6 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go @@ -59,7 +59,7 @@ func TestCreatePlaceholders(t *testing.T) { name string desiredCapacity *int64 activities []*autoscaling.Activity - groupLastUpdateTime *time.Time + groupLastUpdateTime time.Time describeErr error asgToCheck *string }{ @@ -85,7 +85,7 @@ func TestCreatePlaceholders(t *testing.T) { StartTime: aws.Time(time.Unix(10, 0)), }, }, - groupLastUpdateTime: aws.Time(time.Unix(9, 0)), + groupLastUpdateTime: time.Unix(9, 0), }, { name: "AWS scaling failed event before CA scale_up", @@ -96,7 +96,7 @@ func TestCreatePlaceholders(t *testing.T) { StartTime: aws.Time(time.Unix(9, 0)), }, }, - groupLastUpdateTime: aws.Time(time.Unix(10, 0)), + groupLastUpdateTime: time.Unix(10, 0), }, { name: "asg not registered", @@ -107,7 +107,7 @@ func TestCreatePlaceholders(t *testing.T) { StartTime: aws.Time(time.Unix(10, 0)), }, }, - groupLastUpdateTime: aws.Time(time.Unix(9, 0)), + groupLastUpdateTime: time.Unix(9, 0), asgToCheck: aws.String("unregisteredAsgName"), }, } @@ -158,7 +158,7 @@ func TestCreatePlaceholders(t *testing.T) { } asgCache.createPlaceholdersForDesiredNonStartedInstances(groups) assert.Equal(t, int64(len(groups[0].Instances)), *tc.desiredCapacity) - if tc.activities != nil && *tc.activities[0].StatusCode == "Failed" && tc.activities[0].StartTime.After(*tc.groupLastUpdateTime) && asgName == registeredAsgName { + if tc.activities != nil && *tc.activities[0].StatusCode == "Failed" && tc.activities[0].StartTime.After(tc.groupLastUpdateTime) && asgName == registeredAsgName { assert.Equal(t, *groups[0].Instances[0].HealthStatus, placeholderUnfulfillableStatus) } else if len(groups[0].Instances) > 0 { assert.Equal(t, *groups[0].Instances[0].HealthStatus, "") From ad93c8ba910aa998eb0bbab7faed19e559ccaee9 Mon Sep 17 00:00:00 2001 From: David Morrison Date: Thu, 3 Mar 2022 10:08:37 -0800 Subject: [PATCH 7/7] log failure information --- cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go | 2 +- .../cloudprovider/aws/auto_scaling_groups_test.go | 1 - cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go | 1 - cluster-autoscaler/cloudprovider/aws/aws_manager_test.go | 2 -- 4 files changed, 1 insertion(+), 5 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index d39f4685f63b..f15c778d9eee 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -479,7 +479,6 @@ func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*aut func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error) { input := &autoscaling.DescribeScalingActivitiesInput{ AutoScalingGroupName: group.AutoScalingGroupName, - MaxRecords: aws.Int64(1), // We only care about the most recent event } start := time.Now() @@ -496,6 +495,7 @@ func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error) if activity.StartTime.Before(lut) { break } else if *activity.StatusCode == "Failed" { + klog.Warningf("ASG %s scaling failed with %s", asgRef.Name, *activity) return false, nil } } else { diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go index f619af1e2da6..c5078b03b5b7 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go @@ -128,7 +128,6 @@ func TestCreatePlaceholders(t *testing.T) { if shouldCallDescribeScalingActivities { a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ AutoScalingGroupName: asgName, - MaxRecords: aws.Int64(1), }).Return( &autoscaling.DescribeScalingActivitiesOutput{Activities: tc.activities}, tc.describeErr, diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index 2753daabd2dc..e1d6a92a1ef9 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -466,7 +466,6 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ AutoScalingGroupName: aws.String("test-asg"), - MaxRecords: aws.Int64(1), }, ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go index 79d8a2d619a3..50df508f8df6 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go @@ -394,7 +394,6 @@ func TestFetchExplicitAsgs(t *testing.T) { a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ AutoScalingGroupName: aws.String("coolasg"), - MaxRecords: aws.Int64(1), }, ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) @@ -559,7 +558,6 @@ func TestFetchAutoAsgs(t *testing.T) { a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ AutoScalingGroupName: aws.String("coolasg"), - MaxRecords: aws.Int64(1), }, ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil)