Skip to content

Commit

Permalink
feat: Discover Instance Type Memory Capacity (#7004)
Browse files Browse the repository at this point in the history
Signed-off-by: jukie <10012479+Jukie@users.noreply.github.com>
Co-authored-by: Jason Deal <dealj@umich.edu>
  • Loading branch information
jukie and jmdeal authored Oct 21, 2024
1 parent 8184043 commit f9b3292
Show file tree
Hide file tree
Showing 12 changed files with 344 additions and 21 deletions.
1 change: 1 addition & 0 deletions hack/docs/instancetypes_gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ below are the resources available with some assumptions and after the instance o
subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AssociatePublicIPAddressTTL, awscache.DefaultCleanupInterval))
instanceTypeProvider := instancetype.NewDefaultProvider(
cache.New(awscache.InstanceTypesAndZonesTTL, awscache.DefaultCleanupInterval),
cache.New(awscache.DiscoveredCapacityCacheTTL, awscache.DefaultCleanupInterval),
ec2api,
subnetProvider,
instancetype.NewDefaultResolver(
Expand Down
1 change: 1 addition & 0 deletions hack/tools/launchtemplate_counter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func main() {
subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AssociatePublicIPAddressTTL, awscache.DefaultCleanupInterval))
instanceTypeProvider := instancetype.NewDefaultProvider(
cache.New(awscache.InstanceTypesAndZonesTTL, awscache.DefaultCleanupInterval),
cache.New(awscache.DiscoveredCapacityCacheTTL, awscache.DefaultCleanupInterval),
ec2api,
subnetProvider,
instancetype.NewDefaultResolver(
Expand Down
3 changes: 3 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const (
// SSMGetParametersByPathTTL is the time to drop SSM Parameters by path data. This only queries EKS Optimized AMI
// releases, so we should expect this to be updated relatively infrequently.
SSMGetParametersByPathTTL = 24 * time.Hour
// DiscoveredCapacityCacheTTL is the time to drop discovered resource capacity data per-instance type
// if it is not updated by a node creation event or refreshed during controller reconciliation
DiscoveredCapacityCacheTTL = 60 * 24 * time.Hour
)

const (
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
nodeclassstatus "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/status"
nodeclasstermination "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/termination"
controllersinstancetype "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype"
controllersinstancetypecapacity "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype/capacity"
controllerspricing "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/pricing"
"github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate"

Expand Down Expand Up @@ -68,6 +69,7 @@ func NewControllers(ctx context.Context, mgr manager.Manager, sess *session.Sess
nodeclaimtagging.NewController(kubeClient, instanceProvider),
controllerspricing.NewController(pricingProvider),
controllersinstancetype.NewController(instanceTypeProvider),
controllersinstancetypecapacity.NewController(kubeClient, instanceTypeProvider),
status.NewController[*v1.EC2NodeClass](kubeClient, mgr.GetEventRecorderFor("karpenter")),
}
if options.FromContext(ctx).InterruptionQueue != "" {
Expand Down
80 changes: 80 additions & 0 deletions pkg/controllers/providers/instancetype/capacity/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package capacity

import (
"context"
"fmt"

"github.com/awslabs/operatorpkg/reasonable"
corev1 "k8s.io/api/core/v1"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/operator/injection"

"github.com/aws/karpenter-provider-aws/pkg/providers/instancetype"
)

type Controller struct {
kubeClient client.Client
instancetypeProvider *instancetype.DefaultProvider
}

func NewController(kubeClient client.Client, instancetypeProvider *instancetype.DefaultProvider) *Controller {
return &Controller{
kubeClient: kubeClient,
instancetypeProvider: instancetypeProvider,
}
}

func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "providers.instancetype.capacity")
if err := c.instancetypeProvider.UpdateInstanceTypeCapacityFromNode(ctx, c.kubeClient, node); err != nil {
return reconcile.Result{}, fmt.Errorf("updating discovered capacity cache, %w", err)
}
return reconcile.Result{}, nil
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("providers.instancetype.capacity").
For(&corev1.Node{}, builder.WithPredicates(predicate.TypedFuncs[client.Object]{
// Only trigger reconciliation once a node becomes registered. This is an optimization to omit no-op reconciliations and reduce lock contention on the cache.
UpdateFunc: func(e event.TypedUpdateEvent[client.Object]) bool {
if e.ObjectOld.GetLabels()[karpv1.NodeRegisteredLabelKey] != "" {
return false
}
return e.ObjectNew.GetLabels()[karpv1.NodeRegisteredLabelKey] == "true"
},
// Reconcile against all Nodes added to the informer cache in a registered state. This allows us to hydrate the discovered capacity cache on controller startup.
CreateFunc: func(e event.TypedCreateEvent[client.Object]) bool {
return e.Object.GetLabels()[karpv1.NodeRegisteredLabelKey] == "true"
},
DeleteFunc: func(e event.TypedDeleteEvent[client.Object]) bool { return false },
GenericFunc: func(e event.TypedGenericEvent[client.Object]) bool { return false },
})).
WithOptions(controller.Options{
RateLimiter: reasonable.RateLimiter(),
MaxConcurrentReconciles: 1,
}).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}
168 changes: 168 additions & 0 deletions pkg/controllers/providers/instancetype/capacity/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package capacity_test

import (
"context"
"fmt"
"math"
"testing"

"github.com/aws/aws-sdk-go/service/ec2"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/utils/resources"

controllersinstancetypecapacity "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype/capacity"
"github.com/aws/karpenter-provider-aws/pkg/fake"

"sigs.k8s.io/karpenter/pkg/test/v1alpha1"

corev1 "k8s.io/api/core/v1"
coreoptions "sigs.k8s.io/karpenter/pkg/operator/options"
coretest "sigs.k8s.io/karpenter/pkg/test"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
. "sigs.k8s.io/karpenter/pkg/utils/testing"

"github.com/aws/karpenter-provider-aws/pkg/apis"
v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1"
"github.com/aws/karpenter-provider-aws/pkg/operator/options"
"github.com/aws/karpenter-provider-aws/pkg/test"
)

var ctx context.Context
var stop context.CancelFunc
var env *coretest.Environment
var awsEnv *test.Environment
var controller *controllersinstancetypecapacity.Controller

var nodeClass *v1.EC2NodeClass
var nodeClaim *karpv1.NodeClaim
var node *corev1.Node

func TestAWS(t *testing.T) {
ctx = TestContextWithLogger(t)
RegisterFailHandler(Fail)
RunSpecs(t, "CapacityCache")
}

var _ = BeforeSuite(func() {
env = coretest.NewEnvironment(coretest.WithCRDs(apis.CRDs...), coretest.WithCRDs(v1alpha1.CRDs...), coretest.WithFieldIndexers(coretest.NodeClaimFieldIndexer(ctx)))
ctx = coreoptions.ToContext(ctx, coretest.Options())
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{
VMMemoryOverheadPercent: lo.ToPtr[float64](0.075),
}))
ctx, stop = context.WithCancel(ctx)
awsEnv = test.NewEnvironment(ctx, env)
nodeClass = test.EC2NodeClass()
nodeClaim = coretest.NodeClaim()
node = coretest.Node()
controller = controllersinstancetypecapacity.NewController(env.Client, awsEnv.InstanceTypesProvider)
})

var _ = AfterSuite(func() {
stop()
Expect(env.Stop()).To(Succeed(), "Failed to stop environment")
})

var _ = BeforeEach(func() {
awsEnv.Reset()
ec2InstanceTypeInfo := fake.MakeInstances()
ec2Offerings := fake.MakeInstanceOfferings(ec2InstanceTypeInfo)
awsEnv.EC2API.DescribeInstanceTypesOutput.Set(&ec2.DescribeInstanceTypesOutput{
InstanceTypes: ec2InstanceTypeInfo,
})
awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{
InstanceTypeOfferings: ec2Offerings,
})
Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypes(ctx)).To(Succeed())
Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypeOfferings(ctx)).To(Succeed())
})

var _ = AfterEach(func() {
ExpectCleanedUp(ctx, env.Client)
})

var _ = Describe("CapacityCache", func() {
BeforeEach(func() {
ExpectApplied(ctx, env.Client, nodeClass)

node = coretest.Node(coretest.NodeOptions{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
Labels: map[string]string{
corev1.LabelInstanceTypeStable: "t3.medium",
karpv1.NodeRegisteredLabelKey: "true",
},
},
Capacity: corev1.ResourceList{
corev1.ResourceMemory: resource.MustParse(fmt.Sprintf("%dMi", 3840)),
},
})
ExpectApplied(ctx, env.Client, node)

nodeClaim = &karpv1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "test-nodeclaim",
},
Spec: karpv1.NodeClaimSpec{
NodeClassRef: &karpv1.NodeClassReference{
Name: nodeClass.Name,
},
Requirements: make([]karpv1.NodeSelectorRequirementWithMinValues, 0),
},
Status: karpv1.NodeClaimStatus{
NodeName: node.Name,
ImageID: nodeClass.Status.AMIs[0].ID,
},
}
ExpectApplied(ctx, env.Client, nodeClaim)
})
It("should update discovered capacity based on existing nodes", func() {
ExpectObjectReconciled(ctx, env.Client, controller, node)
instanceTypes, err := awsEnv.InstanceTypesProvider.List(ctx, nodeClass)
Expect(err).To(BeNil())
i, ok := lo.Find(instanceTypes, func(i *cloudprovider.InstanceType) bool {
return i.Name == "t3.medium"
})
Expect(ok).To(BeTrue())
Expect(i.Capacity.Memory().Value()).To(Equal(node.Status.Capacity.Memory().Value()), "Expected capacity to match discovered node capacity")
})
It("should use VM_MEMORY_OVERHEAD_PERCENT calculation after AMI update", func() {
ExpectObjectReconciled(ctx, env.Client, controller, node)

// Update NodeClass AMI and list instance-types. Cached values from prior AMI should no longer be used.
nodeClass.Status.AMIs[0].ID = "ami-new-test-id"
ExpectApplied(ctx, env.Client, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, controller, node)
instanceTypesNoCache, err := awsEnv.InstanceTypesProvider.List(ctx, nodeClass)
Expect(err).To(BeNil())
i, ok := lo.Find(instanceTypesNoCache, func(i *cloudprovider.InstanceType) bool {
return i.Name == "t3.medium"
})
Expect(ok).To(BeTrue())

// Calculate memory capacity based on VM_MEMORY_OVERHEAD_PERCENT and output from DescribeInstanceType
mem := resources.Quantity(fmt.Sprintf("%dMi", 8192)) // Reported memory from fake.MakeInstances()
mem.Sub(resource.MustParse(fmt.Sprintf("%dMi", int64(math.Ceil(float64(mem.Value())*options.FromContext(ctx).VMMemoryOverheadPercent/1024/1024)))))
Expect(i.Capacity.Memory().Value()).To(Equal(mem.Value()), "Expected capacity to match VMMemoryOverheadPercent calculation")
})
})
1 change: 1 addition & 0 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
)
instanceTypeProvider := instancetype.NewDefaultProvider(
cache.New(awscache.InstanceTypesAndZonesTTL, awscache.DefaultCleanupInterval),
cache.New(awscache.DiscoveredCapacityCacheTTL, awscache.DefaultCleanupInterval),
ec2api,
subnetProvider,
instancetype.NewDefaultResolver(*sess.Config.Region, pricingProvider, unavailableOfferingsCache),
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
fs.StringVar(&o.ClusterName, "cluster-name", env.WithDefaultString("CLUSTER_NAME", ""), "[REQUIRED] The kubernetes cluster name for resource discovery.")
fs.StringVar(&o.ClusterEndpoint, "cluster-endpoint", env.WithDefaultString("CLUSTER_ENDPOINT", ""), "The external kubernetes cluster endpoint for new nodes to connect with. If not specified, will discover the cluster endpoint using DescribeCluster API.")
fs.BoolVarWithEnv(&o.IsolatedVPC, "isolated-vpc", "ISOLATED_VPC", false, "If true, then assume we can't reach AWS services which don't have a VPC endpoint. This also has the effect of disabling look-ups to the AWS on-demand pricing endpoint.")
fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", utils.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.075), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types.")
fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", utils.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.075), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types when cached information is unavailable.")
fs.StringVar(&o.InterruptionQueue, "interruption-queue", env.WithDefaultString("INTERRUPTION_QUEUE", ""), "Interruption queue is the name of the SQS queue used for processing interruption events from EC2. Interruption handling is disabled if not specified. Enabling interruption handling may require additional permissions on the controller service account. Additional permissions are outlined in the docs.")
fs.IntVar(&o.ReservedENIs, "reserved-enis", env.WithDefaultInt("RESERVED_ENIS", 0), "Reserved ENIs are not included in the calculations for max-pods or kube-reserved. This is most often used in the VPC CNI custom networking setup https://docs.aws.amazon.com/eks/latest/userguide/cni-custom-network.html.")
}
Expand Down
Loading

0 comments on commit f9b3292

Please sign in to comment.