diff --git a/cmd/controller/main.go b/cmd/controller/main.go index ac43af17342c..8b22ca2b992d 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -59,6 +59,7 @@ func main() { op.GetClient(), op.EventRecorder, op.UnavailableOfferingsCache, + op.SSMCache, cloudProvider, op.SubnetProvider, op.SecurityGroupProvider, diff --git a/pkg/apis/v1/ec2nodeclass.go b/pkg/apis/v1/ec2nodeclass.go index 2eb3ed4f1ef5..1d78aa577d0f 100644 --- a/pkg/apis/v1/ec2nodeclass.go +++ b/pkg/apis/v1/ec2nodeclass.go @@ -487,16 +487,40 @@ func (in *EC2NodeClass) AMIFamily() string { if in.Spec.AMIFamily != nil { return *in.Spec.AMIFamily } - if term, ok := lo.Find(in.Spec.AMISelectorTerms, func(t AMISelectorTerm) bool { - return t.Alias != "" - }); ok { - return AMIFamilyFromAlias(term.Alias) + if alias := in.Alias(); alias != nil { + return alias.Family } // Unreachable: validation enforces that one of the above conditions must be met return AMIFamilyCustom } -func AMIFamilyFromAlias(alias string) string { +type Alias struct { + Family string + Version string +} + +const ( + AliasVersionLatest = "latest" +) + +func (a *Alias) String() string { + return fmt.Sprintf("%s@%s", a.Family, a.Version) +} + +func (in *EC2NodeClass) Alias() *Alias { + term, ok := lo.Find(in.Spec.AMISelectorTerms, func(term AMISelectorTerm) bool { + return term.Alias != "" + }) + if !ok { + return nil + } + return &Alias{ + Family: amiFamilyFromAlias(term.Alias), + Version: amiVersionFromAlias(term.Alias), + } +} + +func amiFamilyFromAlias(alias string) string { components := strings.Split(alias, "@") if len(components) != 2 { log.Fatalf("failed to parse AMI alias %q, invalid format", alias) @@ -516,7 +540,7 @@ func AMIFamilyFromAlias(alias string) string { return family } -func AMIVersionFromAlias(alias string) string { +func amiVersionFromAlias(alias string) string { components := strings.Split(alias, "@") if len(components) != 2 { log.Fatalf("failed to parse AMI alias %q, invalid format", alias) diff --git a/pkg/apis/v1/ec2nodeclass_conversion.go b/pkg/apis/v1/ec2nodeclass_conversion.go index 3cdda080e176..d3502c29e4b9 100644 --- a/pkg/apis/v1/ec2nodeclass_conversion.go +++ b/pkg/apis/v1/ec2nodeclass_conversion.go @@ -52,15 +52,10 @@ func (in *EC2NodeClass) ConvertTo(ctx context.Context, to apis.Convertible) erro v1beta1enc.Spec.AMIFamily = lo.ToPtr(in.AMIFamily()) } - if term, ok := lo.Find(in.Spec.AMISelectorTerms, func(term AMISelectorTerm) bool { - return term.Alias != "" - }); ok { - version := AMIVersionFromAlias(term.Alias) - if version != "latest" { - v1beta1enc.Annotations = lo.Assign(v1beta1enc.Annotations, map[string]string{ - AnnotationAliasVersionCompatibilityKey: version, - }) - } + if alias := in.Alias(); alias != nil && alias.Version != AliasVersionLatest { + v1beta1enc.Annotations = lo.Assign(v1beta1enc.Annotations, map[string]string{ + AnnotationAliasVersionCompatibilityKey: alias.Version, + }) } in.Spec.convertTo(&v1beta1enc.Spec) diff --git a/pkg/apis/v1/zz_generated.deepcopy.go b/pkg/apis/v1/zz_generated.deepcopy.go index ebe7f92cf5fe..6e902cd24e31 100644 --- a/pkg/apis/v1/zz_generated.deepcopy.go +++ b/pkg/apis/v1/zz_generated.deepcopy.go @@ -69,6 +69,21 @@ func (in *AMISelectorTerm) DeepCopy() *AMISelectorTerm { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Alias) DeepCopyInto(out *Alias) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Alias. +func (in *Alias) DeepCopy() *Alias { + if in == nil { + return nil + } + out := new(Alias) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BlockDevice) DeepCopyInto(out *BlockDevice) { *out = *in diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 39e85e172713..f9671cdf4b1b 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -31,9 +31,9 @@ const ( InstanceTypesAndZonesTTL = 5 * time.Minute // InstanceProfileTTL is the time before we refresh checking instance profile existence at IAM InstanceProfileTTL = 15 * time.Minute - // SSMProviderTTL is the time to drop SSM Provider data. This only queries EKS Optimized AMI + // SSMCacheTTL is the time to drop SSM Parameters by path data. This only queries EKS Optimized AMI // releases, so we should expect this to be updated relatively infrequently. - SSMProviderTTL = 24 * time.Hour + SSMCacheTTL = 24 * time.Hour ) const ( diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 01fd3b1f3ef0..8c6341af858f 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -19,6 +19,9 @@ import ( "sigs.k8s.io/karpenter/pkg/cloudprovider" + "github.com/patrickmn/go-cache" + + ssminvalidation "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/ssm/invalidation" "github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate" "github.com/aws/aws-sdk-go/aws/session" @@ -31,7 +34,7 @@ import ( "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/operator/controller" - "github.com/aws/karpenter-provider-aws/pkg/cache" + awscache "github.com/aws/karpenter-provider-aws/pkg/cache" "github.com/aws/karpenter-provider-aws/pkg/controllers/interruption" nodeclaimgarbagecollection "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclaim/garbagecollection" nodeclaimtagging "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclaim/tagging" @@ -46,15 +49,28 @@ import ( "github.com/aws/karpenter-provider-aws/pkg/providers/subnet" ) -func NewControllers(ctx context.Context, sess *session.Session, clk clock.Clock, kubeClient client.Client, recorder events.Recorder, - unavailableOfferings *cache.UnavailableOfferings, cloudProvider cloudprovider.CloudProvider, subnetProvider *subnet.Provider, - securityGroupProvider *securitygroup.Provider, instanceProfileProvider *instanceprofile.Provider, instanceProvider *instance.Provider, - pricingProvider *pricing.Provider, amiProvider *amifamily.Provider, launchTemplateProvider *launchtemplate.Provider) []controller.Controller { - +func NewControllers( + ctx context.Context, + sess *session.Session, + clk clock.Clock, + kubeClient client.Client, + recorder events.Recorder, + unavailableOfferings *awscache.UnavailableOfferings, + ssmCache *cache.Cache, + cloudProvider cloudprovider.CloudProvider, + subnetProvider *subnet.Provider, + securityGroupProvider *securitygroup.Provider, + instanceProfileProvider *instanceprofile.Provider, + instanceProvider *instance.Provider, + pricingProvider *pricing.Provider, + amiProvider *amifamily.Provider, + launchTemplateProvider *launchtemplate.Provider, +) []controller.Controller { controllers := []controller.Controller{ nodeclass.NewController(kubeClient, recorder, subnetProvider, securityGroupProvider, amiProvider, instanceProfileProvider, launchTemplateProvider), nodeclaimgarbagecollection.NewController(kubeClient, cloudProvider), nodeclaimtagging.NewController(kubeClient, instanceProvider), + ssminvalidation.NewController(ssmCache, amiProvider), } if options.FromContext(ctx).InterruptionQueue != "" { controllers = append(controllers, interruption.NewController(kubeClient, clk, recorder, lo.Must(sqs.NewProvider(ctx, servicesqs.New(sess), options.FromContext(ctx).InterruptionQueue)), unavailableOfferings)) diff --git a/pkg/controllers/providers/ssm/invalidation/controller.go b/pkg/controllers/providers/ssm/invalidation/controller.go new file mode 100644 index 000000000000..7ab367be50f1 --- /dev/null +++ b/pkg/controllers/providers/ssm/invalidation/controller.go @@ -0,0 +1,91 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package invalidation + +import ( + "context" + "time" + + "github.com/patrickmn/go-cache" + "github.com/samber/lo" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/karpenter/pkg/operator/controller" + "sigs.k8s.io/karpenter/pkg/operator/injection" + + "github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1" + "github.com/aws/karpenter-provider-aws/pkg/providers/amifamily" + "github.com/aws/karpenter-provider-aws/pkg/providers/ssm" +) + +// The SSM Invalidation controller is responsible for invalidating "latest" SSM parameters when they point to deprecated +// AMIs. This can occur when an EKS-optimized AMI with a regression is released, and the AMI team chooses to deprecate +// the AMI. Normally, SSM parameter cache entries expire after 24 hours to prevent a thundering herd upon a new AMI +// release, however Karpenter should react faster when an AMI is deprecated. This controller will ensure Karpenter +// reacts to AMI deprecations within it's polling period (30m). +type Controller struct { + cache *cache.Cache + amiProvider *amifamily.Provider +} + +func NewController(ssmCache *cache.Cache, amiProvider *amifamily.Provider) *Controller { + return &Controller{ + cache: ssmCache, + amiProvider: amiProvider, + } +} + +func (c *Controller) Name() string { + return "providers.ssm.invalidation" +} + +func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, c.Name()) + + amiIDsToParameters := map[string]ssm.Parameter{} + for _, item := range c.cache.Items() { + entry := item.Object.(ssm.CacheEntry) + if !entry.Parameter.IsMutable { + continue + } + amiIDsToParameters[entry.Value] = entry.Parameter + } + amis := []amifamily.AMI{} + for _, nodeClass := range lo.Map(lo.Keys(amiIDsToParameters), func(amiID string, _ int) *v1beta1.EC2NodeClass { + return &v1beta1.EC2NodeClass{ + Spec: v1beta1.EC2NodeClassSpec{ + AMISelectorTerms: []v1beta1.AMISelectorTerm{{ID: amiID}}, + }, + } + }) { + resolvedAMIs, err := c.amiProvider.Get(ctx, nodeClass, nil) + if err != nil { + return reconcile.Result{}, err + } + amis = append(amis, resolvedAMIs...) + } + for _, ami := range amis { + if !ami.Deprecated { + continue + } + parameter := amiIDsToParameters[ami.AmiID] + c.cache.Delete(parameter.CacheKey()) + } + return reconcile.Result{RequeueAfter: 30 * time.Minute}, nil +} + +func (c *Controller) Builder(_ context.Context, m manager.Manager) controller.Builder { + return controller.NewSingletonManagedBy(m) +} diff --git a/pkg/controllers/providers/ssm/invalidation/suite_test.go b/pkg/controllers/providers/ssm/invalidation/suite_test.go new file mode 100644 index 000000000000..ccecfdc1939b --- /dev/null +++ b/pkg/controllers/providers/ssm/invalidation/suite_test.go @@ -0,0 +1,145 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package invalidation_test + +import ( + "context" + "testing" + "time" + + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/samber/lo" + "sigs.k8s.io/controller-runtime/pkg/client" + coreoptions "sigs.k8s.io/karpenter/pkg/operator/options" + "sigs.k8s.io/karpenter/pkg/operator/scheme" + coretest "sigs.k8s.io/karpenter/pkg/test" + "sigs.k8s.io/karpenter/pkg/test/v1alpha1" + + "github.com/aws/karpenter-provider-aws/pkg/apis" + "github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1" + "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/ssm/invalidation" + "github.com/aws/karpenter-provider-aws/pkg/operator/options" + "github.com/aws/karpenter-provider-aws/pkg/providers/ssm" + "github.com/aws/karpenter-provider-aws/pkg/test" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + . "knative.dev/pkg/logging/testing" + . "sigs.k8s.io/karpenter/pkg/test/expectations" +) + +var ctx context.Context +var stop context.CancelFunc +var env *coretest.Environment +var awsEnv *test.Environment +var invalidationController *invalidation.Controller + +func TestAWS(t *testing.T) { + ctx = TestContextWithLogger(t) + RegisterFailHandler(Fail) + RunSpecs(t, "SSM Invalidation Controller") +} + +var _ = BeforeSuite(func() { + env = coretest.NewEnvironment(scheme.Scheme, coretest.WithCRDs(apis.CRDs...), coretest.WithCRDs(v1alpha1.CRDs...)) + ctx = coreoptions.ToContext(ctx, coretest.Options()) + ctx = options.ToContext(ctx, test.Options()) + ctx, stop = context.WithCancel(ctx) + awsEnv = test.NewEnvironment(ctx, env) + + invalidationController = invalidation.NewController(awsEnv.SSMCache, awsEnv.AMIProvider) +}) + +var _ = AfterSuite(func() { + stop() + Expect(env.Stop()).To(Succeed(), "Failed to stop environment") +}) + +var _ = BeforeEach(func() { + ctx = coreoptions.ToContext(ctx, coretest.Options()) + ctx = options.ToContext(ctx, test.Options()) + awsEnv.Reset() +}) + +var _ = Describe("SSM Invalidation Controller", func() { + var nodeClass *v1beta1.EC2NodeClass + BeforeEach(func() { + nodeClass = &v1beta1.EC2NodeClass{ + Spec: v1beta1.EC2NodeClassSpec{ + AMIFamily: &v1beta1.AMIFamilyAL2, + }, + } + }) + It("shouldn't invalidate cache entries for non-deprecated AMIs", func() { + _, err := awsEnv.AMIProvider.Get(ctx, nodeClass, nil) + Expect(err).To(BeNil()) + currentEntries := getSSMCacheEntries() + Expect(len(currentEntries)).To(Equal(3)) + awsEnv.EC2Cache.Flush() + ExpectReconcileSucceeded(ctx, invalidationController, client.ObjectKey{}) + awsEnv.SSMAPI.Reset() + _, err = awsEnv.AMIProvider.Get(ctx, nodeClass, nil) + Expect(err).To(BeNil()) + updatedEntries := getSSMCacheEntries() + Expect(len(updatedEntries)).To(Equal(3)) + for parameter, amiID := range currentEntries { + updatedAMIID, ok := updatedEntries[parameter] + Expect(ok).To(BeTrue()) + Expect(updatedAMIID).To(Equal(amiID)) + } + }) + It("should invalidate cache entries for deprecated AMIs when the SSM parameter is mutable", func() { + _, err := awsEnv.AMIProvider.Get(ctx, nodeClass, nil) + Expect(err).To(BeNil()) + currentEntries := getSSMCacheEntries() + deprecateAMIs(lo.Values(currentEntries)...) + Expect(len(currentEntries)).To(Equal(3)) + awsEnv.EC2Cache.Flush() + ExpectReconcileSucceeded(ctx, invalidationController, client.ObjectKey{}) + awsEnv.SSMAPI.Reset() + _, err = awsEnv.AMIProvider.Get(ctx, nodeClass, nil) + Expect(err).To(BeNil()) + updatedEntries := getSSMCacheEntries() + Expect(len(updatedEntries)).To(Equal(3)) + for parameter, amiID := range currentEntries { + updatedAMIID, ok := updatedEntries[parameter] + Expect(ok).To(BeTrue()) + Expect(updatedAMIID).ToNot(Equal(amiID)) + } + }) +}) + +func getSSMCacheEntries() map[string]string { + entries := map[string]string{} + for _, item := range awsEnv.SSMCache.Items() { + entry := item.Object.(ssm.CacheEntry) + entries[entry.Parameter.Name] = entry.Value + } + return entries +} + +func deprecateAMIs(amiIDs ...string) { + awsEnv.EC2API.DescribeImagesOutput.Set(&ec2.DescribeImagesOutput{ + Images: lo.Map(amiIDs, func(amiID string, _ int) *ec2.Image { + return &ec2.Image{ + Name: lo.ToPtr(coretest.RandomName()), + ImageId: lo.ToPtr(amiID), + CreationDate: lo.ToPtr(awsEnv.Clock.Now().Add(-24 * time.Hour).Format(time.RFC3339)), + Architecture: lo.ToPtr("x86_64"), + DeprecationTime: lo.ToPtr(awsEnv.Clock.Now().Add(-12 * time.Hour).Format(time.RFC3339)), + } + }), + }) +} diff --git a/pkg/fake/ssmapi.go b/pkg/fake/ssmapi.go index b9d0e1711d84..c772d55d842c 100644 --- a/pkg/fake/ssmapi.go +++ b/pkg/fake/ssmapi.go @@ -18,10 +18,9 @@ import ( "context" "fmt" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/mitchellh/hashstructure/v2" + "github.com/Pallinder/go-randomdata" + "github.com/samber/lo" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/ssm" "github.com/aws/aws-sdk-go/service/ssm/ssmiface" @@ -32,36 +31,54 @@ type SSMAPI struct { Parameters map[string]string GetParameterOutput *ssm.GetParameterOutput WantErr error + + defaultParameters map[string]string } func NewSSMAPI() *SSMAPI { - return &SSMAPI{} + return &SSMAPI{ + defaultParameters: map[string]string{}, + } } func (a SSMAPI) GetParameterWithContext(_ context.Context, input *ssm.GetParameterInput, _ ...request.Option) (*ssm.GetParameterOutput, error) { + parameter := lo.FromPtr(input.Name) if a.WantErr != nil { - return nil, a.WantErr + return &ssm.GetParameterOutput{}, a.WantErr } - if len(a.Parameters) > 0 { - if amiID, ok := a.Parameters[*input.Name]; ok { - return &ssm.GetParameterOutput{ - Parameter: &ssm.Parameter{Value: aws.String(amiID)}, - }, nil - } - return nil, awserr.New(ssm.ErrCodeParameterNotFound, fmt.Sprintf("%s couldn't be found", *input.Name), nil) - } - hc, _ := hashstructure.Hash(input.Name, hashstructure.FormatV2, nil) if a.GetParameterOutput != nil { return a.GetParameterOutput, nil } + if len(a.Parameters) != 0 { + value, ok := a.Parameters[parameter] + if !ok { + return &ssm.GetParameterOutput{}, fmt.Errorf("parameter %q not found", lo.FromPtr(input.Name)) + } + return &ssm.GetParameterOutput{ + Parameter: &ssm.Parameter{ + Name: lo.ToPtr(parameter), + Value: lo.ToPtr(value), + }, + }, nil + } + // Cache default parameters that was successive calls for the same parameter return the same result + value, ok := a.defaultParameters[parameter] + if !ok { + value = fmt.Sprintf("ami-%s", randomdata.Alphanumeric(16)) + a.defaultParameters[parameter] = value + } return &ssm.GetParameterOutput{ - Parameter: &ssm.Parameter{Value: aws.String(fmt.Sprintf("test-ami-id-%x", hc))}, + Parameter: &ssm.Parameter{ + Name: lo.ToPtr(parameter), + Value: lo.ToPtr(value), + }, }, nil } func (a *SSMAPI) Reset() { - a.GetParameterOutput = nil a.Parameters = nil + a.GetParameterOutput = nil a.WantErr = nil + a.defaultParameters = map[string]string{} } diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 1d7ea8c1ad67..d3d190eb234e 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -77,6 +77,7 @@ type Operator struct { Session *session.Session UnavailableOfferingsCache *awscache.UnavailableOfferings + SSMCache *cache.Cache EC2API ec2iface.EC2API SubnetProvider *subnet.Provider SecurityGroupProvider *securitygroup.Provider @@ -135,6 +136,8 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont } unavailableOfferingsCache := awscache.NewUnavailableOfferings() + ssmCache := cache.New(awscache.SSMCacheTTL, awscache.DefaultCleanupInterval) + subnetProvider := subnet.NewProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)) securityGroupProvider := securitygroup.NewProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)) instanceProfileProvider := instanceprofile.NewProvider(*sess.Config.Region, iam.New(sess), cache.New(awscache.InstanceProfileTTL, awscache.DefaultCleanupInterval)) @@ -145,8 +148,8 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont *sess.Config.Region, ) versionProvider := version.NewProvider(operator.KubernetesInterface, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)) - ssmProvider := ssmp.NewDefaultProvider(ssm.New(sess), cache.New(awscache.SSMProviderTTL, awscache.DefaultCleanupInterval)) - amiProvider := amifamily.NewProvider(versionProvider, ssmProvider, ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)) + ssmProvider := ssmp.NewDefaultProvider(ssm.New(sess), ssmCache) + amiProvider := amifamily.NewProvider(operator.Clock, versionProvider, ssmProvider, ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)) amiResolver := amifamily.New(amiProvider) launchTemplateProvider := launchtemplate.NewProvider( ctx, @@ -191,6 +194,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont Operator: operator, Session: sess, UnavailableOfferingsCache: unavailableOfferingsCache, + SSMCache: ssmCache, EC2API: ec2api, SubnetProvider: subnetProvider, SecurityGroupProvider: securityGroupProvider, diff --git a/pkg/providers/amifamily/ami.go b/pkg/providers/amifamily/ami.go index e122a04a8565..67cac088bdc9 100644 --- a/pkg/providers/amifamily/ami.go +++ b/pkg/providers/amifamily/ami.go @@ -29,6 +29,7 @@ import ( "github.com/patrickmn/go-cache" "github.com/samber/lo" v1 "k8s.io/api/core/v1" + "k8s.io/utils/clock" "knative.dev/pkg/logging" "github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1" @@ -47,6 +48,7 @@ type Provider struct { cm *pretty.ChangeMonitor versionProvider *version.Provider ssmProvider ssm.Provider + clk clock.Clock } type AMI struct { @@ -54,6 +56,7 @@ type AMI struct { AmiID string CreationDate string Requirements scheduling.Requirements + Deprecated bool } type AMIs []AMI @@ -97,13 +100,14 @@ func (a AMIs) MapToInstanceTypes(instanceTypes []*cloudprovider.InstanceType) ma return amiIDs } -func NewProvider(versionProvider *version.Provider, ssmProvider ssm.Provider, ec2api ec2iface.EC2API, cache *cache.Cache) *Provider { +func NewProvider(clock clock.Clock, versionProvider *version.Provider, ssmProvider ssm.Provider, ec2api ec2iface.EC2API, cache *cache.Cache) *Provider { return &Provider{ cache: cache, ec2api: ec2api, cm: pretty.NewChangeMonitor(), ssmProvider: ssmProvider, versionProvider: versionProvider, + clk: clock, } } @@ -162,6 +166,7 @@ func (p *Provider) getDefaultAMIs(ctx context.Context, nodeClass *v1beta1.EC2Nod if res[j].AmiID == aws.StringValue(page.Images[i].ImageId) { res[j].Name = aws.StringValue(page.Images[i].Name) res[j].CreationDate = aws.StringValue(page.Images[i].CreationDate) + res[j].Deprecated = p.IsDeprecated(page.Images[i]) } } } @@ -173,8 +178,11 @@ func (p *Provider) getDefaultAMIs(ctx context.Context, nodeClass *v1beta1.EC2Nod return res, nil } -func (p *Provider) resolveSSMParameter(ctx context.Context, ssmQuery string) (string, error) { - imageID, err := p.ssmProvider.Get(ctx, ssmQuery) +func (p *Provider) resolveSSMParameter(ctx context.Context, name string) (string, error) { + imageID, err := p.ssmProvider.Get(ctx, ssm.Parameter{ + Name: name, + IsMutable: true, + }) if err != nil { return "", err } @@ -222,6 +230,7 @@ func (p *Provider) getAMIs(ctx context.Context, terms []v1beta1.AMISelectorTerm) AmiID: lo.FromPtr(page.Images[i].ImageId), CreationDate: lo.FromPtr(page.Images[i].CreationDate), Requirements: reqs, + Deprecated: p.IsDeprecated(page.Images[i]), } } return true @@ -292,3 +301,13 @@ func (p *Provider) getRequirementsFromImage(ec2Image *ec2.Image) scheduling.Requ requirements.Add(scheduling.NewRequirement(v1.LabelArchStable, v1.NodeSelectorOpIn, architecture)) return requirements } + +func (p *Provider) IsDeprecated(image *ec2.Image) bool { + if image.DeprecationTime == nil { + return false + } + if deprecationTime := lo.Must(time.Parse(time.RFC3339, *image.DeprecationTime)); deprecationTime.After(p.clk.Now()) { + return false + } + return true +} diff --git a/pkg/providers/ssm/provider.go b/pkg/providers/ssm/provider.go index 85c51463212a..7e005124272a 100644 --- a/pkg/providers/ssm/provider.go +++ b/pkg/providers/ssm/provider.go @@ -19,7 +19,6 @@ import ( "fmt" "sync" - "github.com/aws/aws-sdk-go/service/ssm" "github.com/aws/aws-sdk-go/service/ssm/ssmiface" "github.com/patrickmn/go-cache" "github.com/samber/lo" @@ -27,7 +26,7 @@ import ( ) type Provider interface { - Get(context.Context, string) (string, error) + Get(context.Context, Parameter) (string, error) } type DefaultProvider struct { @@ -43,19 +42,20 @@ func NewDefaultProvider(ssmapi ssmiface.SSMAPI, cache *cache.Cache) *DefaultProv } } -func (p *DefaultProvider) Get(ctx context.Context, parameter string) (string, error) { +func (p *DefaultProvider) Get(ctx context.Context, parameter Parameter) (string, error) { p.Lock() defer p.Unlock() - if result, ok := p.cache.Get(parameter); ok { - return result.(string), nil + if entry, ok := p.cache.Get(parameter.CacheKey()); ok { + return entry.(CacheEntry).Value, nil } - result, err := p.ssmapi.GetParameterWithContext(ctx, &ssm.GetParameterInput{ - Name: lo.ToPtr(parameter), - }) + result, err := p.ssmapi.GetParameterWithContext(ctx, parameter.GetParameterInput()) if err != nil { - return "", fmt.Errorf("getting ssm parameter %q, %w", parameter, err) + return "", fmt.Errorf("getting ssm parameter %q, %w", parameter.Name, err) } - p.cache.SetDefault(parameter, lo.FromPtr(result.Parameter.Value)) - logging.FromContext(ctx).With("parameter", parameter, "value", lo.FromPtr(result.Parameter.Value)).Info("discovered ssm parameter") + p.cache.SetDefault(parameter.CacheKey(), CacheEntry{ + Parameter: parameter, + Value: lo.FromPtr(result.Parameter.Value), + }) + logging.FromContext(ctx).With("parameter", parameter.Name, "value", result.Parameter.Value).Info("discovered ssm parameter") return lo.FromPtr(result.Parameter.Value), nil } diff --git a/pkg/providers/ssm/types.go b/pkg/providers/ssm/types.go new file mode 100644 index 000000000000..818f97027b8c --- /dev/null +++ b/pkg/providers/ssm/types.go @@ -0,0 +1,43 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ssm + +import ( + "github.com/aws/aws-sdk-go/service/ssm" + "github.com/samber/lo" +) + +type Parameter struct { + Name string + // IsMutable indicates if the value associated with an SSM parameter is expected to change. An example of a mutable + // parameter would be any of the "latest" or "recommended" AMI parameters which are updated each time a new AMI is + // released. On the otherhand, we would consider a parameter parameter for a specific AMI version to be immutable. + IsMutable bool +} + +func (p *Parameter) GetParameterInput() *ssm.GetParameterInput { + return &ssm.GetParameterInput{ + Name: lo.ToPtr(p.Name), + } +} + +func (p *Parameter) CacheKey() string { + return p.Name +} + +type CacheEntry struct { + Parameter Parameter + Value string +} diff --git a/pkg/test/environment.go b/pkg/test/environment.go index 237da4f5500e..da006d00d300 100644 --- a/pkg/test/environment.go +++ b/pkg/test/environment.go @@ -17,11 +17,12 @@ package test import ( "context" "net" + "time" "github.com/patrickmn/go-cache" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" - "knative.dev/pkg/ptr" + clock "k8s.io/utils/clock/testing" corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1" "sigs.k8s.io/karpenter/pkg/operator/scheme" @@ -36,7 +37,7 @@ import ( "github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate" "github.com/aws/karpenter-provider-aws/pkg/providers/pricing" "github.com/aws/karpenter-provider-aws/pkg/providers/securitygroup" - "github.com/aws/karpenter-provider-aws/pkg/providers/ssm" + ssmp "github.com/aws/karpenter-provider-aws/pkg/providers/ssm" "github.com/aws/karpenter-provider-aws/pkg/providers/subnet" "github.com/aws/karpenter-provider-aws/pkg/providers/version" @@ -66,7 +67,7 @@ type Environment struct { SubnetCache *cache.Cache SecurityGroupCache *cache.Cache InstanceProfileCache *cache.Cache - SSMProviderCache *cache.Cache + SSMCache *cache.Cache // Providers InstanceTypesProvider *instancetype.Provider @@ -79,9 +80,13 @@ type Environment struct { AMIResolver *amifamily.Resolver VersionProvider *version.Provider LaunchTemplateProvider *launchtemplate.Provider + + Clock *clock.FakeClock } func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment { + clock := &clock.FakeClock{} + // API ec2api := fake.NewEC2API() ssmapi := fake.NewSSMAPI() @@ -96,7 +101,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment subnetCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) securityGroupCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) instanceProfileCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) - ssmProviderCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) + ssmCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) fakePricingAPI := &fake.PricingAPI{} // Providers @@ -105,8 +110,8 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment securityGroupProvider := securitygroup.NewProvider(ec2api, securityGroupCache) versionProvider := version.NewProvider(env.KubernetesInterface, kubernetesVersionCache) instanceProfileProvider := instanceprofile.NewProvider(fake.DefaultRegion, iamapi, instanceProfileCache) - ssmProvider := ssm.NewDefaultProvider(ssmapi, ssmProviderCache) - amiProvider := amifamily.NewProvider(versionProvider, ssmProvider, ec2api, ec2Cache) + ssmProvider := ssmp.NewDefaultProvider(ssmapi, ssmCache) + amiProvider := amifamily.NewProvider(clock, versionProvider, ssmProvider, ec2api, ec2Cache) amiResolver := amifamily.New(amiProvider) instanceTypesProvider := instancetype.NewProvider(fake.DefaultRegion, instanceTypeCache, ec2api, subnetProvider, unavailableOfferingsCache, pricingProvider) launchTemplateProvider := @@ -118,7 +123,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment securityGroupProvider, subnetProvider, instanceProfileProvider, - ptr.String("ca-bundle"), + lo.ToPtr("ca-bundle"), make(chan struct{}), net.ParseIP("10.0.100.10"), "https://test-cluster", @@ -147,7 +152,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment SecurityGroupCache: securityGroupCache, InstanceProfileCache: instanceProfileCache, UnavailableOfferingsCache: unavailableOfferingsCache, - SSMProviderCache: ssmProviderCache, + SSMCache: ssmCache, InstanceTypesProvider: instanceTypesProvider, InstanceProvider: instanceProvider, @@ -159,10 +164,14 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment AMIProvider: amiProvider, AMIResolver: amiResolver, VersionProvider: versionProvider, + + Clock: clock, } } func (env *Environment) Reset() { + env.Clock.SetTime(time.Now()) + env.EC2API.Reset() env.SSMAPI.Reset() env.IAMAPI.Reset() @@ -177,7 +186,7 @@ func (env *Environment) Reset() { env.SubnetCache.Flush() env.SecurityGroupCache.Flush() env.InstanceProfileCache.Flush() - env.SSMProviderCache.Flush() + env.SSMCache.Flush() mfs, err := crmetrics.Registry.Gather() if err != nil {