Skip to content

Commit

Permalink
Create placeholder deployment for Auto clusters if --nodes > 0 (#520)
Browse files Browse the repository at this point in the history
  • Loading branch information
cartermckinnon authored Dec 13, 2024
1 parent bd9412a commit c74a892
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 26 deletions.
14 changes: 7 additions & 7 deletions kubetest2/internal/deployers/eksapi/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type deployer struct {
infraManager *InfrastructureManager
clusterManager *ClusterManager
addonManager *AddonManager
nodegroupManager *NodegroupManager
nodeManager *nodeManager
logManager *logManager
staticClusterManager *StaticClusterManager

Expand Down Expand Up @@ -124,7 +124,7 @@ func (d *deployer) Init() error {
d.infraManager = NewInfrastructureManager(d.awsClients, resourceID, d.metrics)
d.clusterManager = NewClusterManager(d.awsClients, resourceID)
d.addonManager = NewAddonManager(d.awsClients)
d.nodegroupManager = NewNodegroupManager(d.awsClients, resourceID)
d.nodeManager = NewNodeManager(d.awsClients, resourceID)
d.logManager = NewLogManager(d.awsClients, resourceID)
if d.deployerOptions.StaticClusterName != "" {
d.staticClusterManager = NewStaticClusterManager(&d.deployerOptions)
Expand Down Expand Up @@ -189,7 +189,7 @@ func (d *deployer) Up() error {
if err != nil {
return err
}
if d.deployerOptions.StaticClusterName != "" || d.deployerOptions.AutoMode {
if d.deployerOptions.StaticClusterName != "" {
klog.Infof("inited k8sclient, skip the rest resource creation for static cluster")
d.staticClusterManager.SetK8sClient(kubeconfig)
if err := d.staticClusterManager.EnsureNodeForStaticCluster(); err != nil {
Expand All @@ -215,7 +215,7 @@ func (d *deployer) Up() error {
return err
}
}
if err := d.nodegroupManager.createNodegroup(d.infra, d.cluster, &d.deployerOptions); err != nil {
if err := d.nodeManager.createNodes(d.infra, d.cluster, &d.deployerOptions, d.k8sClient); err != nil {
return err
}
if err := waitForReadyNodes(d.k8sClient, d.Nodes, d.NodeReadyTimeout); err != nil {
Expand Down Expand Up @@ -322,11 +322,11 @@ func (d *deployer) Down() error {
if d.deployerOptions.StaticClusterName != "" {
return d.staticClusterManager.TearDownNodeForStaticCluster()
}
return deleteResources(d.infraManager, d.clusterManager, d.nodegroupManager)
return deleteResources(d.infraManager, d.clusterManager, d.nodeManager)
}

func deleteResources(im *InfrastructureManager, cm *ClusterManager, nm *NodegroupManager) error {
if err := nm.deleteNodegroup(); err != nil {
func deleteResources(im *InfrastructureManager, cm *ClusterManager, nm *nodeManager) error {
if err := nm.deleteNodes(); err != nil {
return err
}
// the EKS-managed cluster security group may be associated with a leaked ENI
Expand Down
4 changes: 2 additions & 2 deletions kubetest2/internal/deployers/eksapi/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ func (j *janitor) Sweep(ctx context.Context) error {
clients := j.awsClientsForStack(stack)
infraManager := NewInfrastructureManager(clients, resourceID, j.metrics)
clusterManager := NewClusterManager(clients, resourceID)
nodegroupManager := NewNodegroupManager(clients, resourceID)
nodeManager := NewNodeManager(clients, resourceID)
klog.Infof("deleting resources (%v old): %s", resourceAge, resourceID)
if err := deleteResources(infraManager, clusterManager, nodegroupManager); err != nil {
if err := deleteResources(infraManager, clusterManager, nodeManager); err != nil {
errs = append(errs, fmt.Errorf("failed to delete resources: %s: %v", resourceID, err))
}
}
Expand Down
5 changes: 5 additions & 0 deletions kubetest2/internal/deployers/eksapi/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ func (m *logManager) gatherLogsFromNodes(k8sClient *kubernetes.Clientset, opts *
klog.Info("--log-bucket is empty, no logs will be gathered!")
return nil
}
// TODO: gather logs from Auto nodes
if opts.AutoMode {
klog.Info("--auto-mode was used, no logs will be gathered!")
return nil
}
switch opts.UserDataFormat {
case "bootstrap.sh", "nodeadm", "": // if no --user-data-format was passed, we must be using managed nodes, which default to AL-based AMIs
return m.gatherLogsUsingScript(k8sClient, opts, phase)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
_ "embed"
"errors"
"fmt"
"math/rand/v2"
"strconv"
"strings"
"time"
Expand All @@ -19,7 +20,12 @@ import (
"github.com/aws/aws-sdk-go-v2/service/eks"
ekstypes "github.com/aws/aws-sdk-go-v2/service/eks/types"
"github.com/aws/smithy-go"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"

"github.com/aws/aws-k8s-tester/kubetest2/internal/deployers/eksapi/templates"
)
Expand Down Expand Up @@ -58,20 +64,23 @@ var (
}
)

type NodegroupManager struct {
type nodeManager struct {
clients *awsClients
resourceID string
}

func NewNodegroupManager(clients *awsClients, resourceID string) *NodegroupManager {
return &NodegroupManager{
func NewNodeManager(clients *awsClients, resourceID string) *nodeManager {
return &nodeManager{
clients: clients,
resourceID: resourceID,
}
}

func (m *NodegroupManager) createNodegroup(infra *Infrastructure, cluster *Cluster, opts *deployerOptions) error {
if opts.UnmanagedNodes {
func (m *nodeManager) createNodes(infra *Infrastructure, cluster *Cluster, opts *deployerOptions, k8sClient *kubernetes.Clientset) error {
if opts.AutoMode {
_, err := m.createPlaceholderDeployment(k8sClient, opts.Nodes)
return err
} else if opts.UnmanagedNodes {
if len(opts.InstanceTypes) == 0 {
if out, err := m.clients.EC2().DescribeImages(context.TODO(), &ec2.DescribeImagesInput{
ImageIds: []string{opts.AMI},
Expand All @@ -96,7 +105,63 @@ func (m *NodegroupManager) createNodegroup(infra *Infrastructure, cluster *Clust
}
}

func (m *NodegroupManager) createManagedNodegroup(infra *Infrastructure, cluster *Cluster, opts *deployerOptions) error {
// createPlaceholderDeployment creates a Deployment with the specified number of replicas that requires
// each replica to be scheduled on different nodes.
// This ensures that (at least) the specified number of nodes exist in an EKS Auto cluster
func (m *nodeManager) createPlaceholderDeployment(k8sClient *kubernetes.Clientset, replicas int) (*appsv1.Deployment, error) {
if replicas == 0 {
klog.Info("not creating placeholder deployment!")
return nil, nil
}
disambiguator := fmt.Sprintf("-%d", rand.IntN(1000))
labels := map[string]string{
"app": "placeholder-deployment" + disambiguator,
}
d := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{Name: "placeholder" + disambiguator, Namespace: "default"},
Spec: appsv1.DeploymentSpec{
Replicas: pointer.Int32(int32(replicas)),
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Affinity: &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: labels,
},
TopologyKey: "kubernetes.io/hostname",
},
},
},
},
Containers: []corev1.Container{
{
Name: "main",
Image: "public.ecr.aws/amazonlinux/amazonlinux:2023",
Command: []string{"sleep", "infinity"},
},
},
},
},
},
}
klog.Infof("creating placeholder deployment...")
d, err := k8sClient.AppsV1().Deployments("default").Create(context.TODO(), d, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create placeholder deployment: %v", err)
}
klog.Infof("created placeholder deployment: %+v", d)
return d, nil
}

func (m *nodeManager) createManagedNodegroup(infra *Infrastructure, cluster *Cluster, opts *deployerOptions) error {
klog.Infof("creating nodegroup...")
input := eks.CreateNodegroupInput{
ClusterName: aws.String(m.resourceID),
Expand Down Expand Up @@ -155,7 +220,7 @@ func (m *NodegroupManager) createManagedNodegroup(infra *Infrastructure, cluster
return nil
}

func (m *NodegroupManager) createUnmanagedNodegroup(infra *Infrastructure, cluster *Cluster, opts *deployerOptions) error {
func (m *nodeManager) createUnmanagedNodegroup(infra *Infrastructure, cluster *Cluster, opts *deployerOptions) error {
stackName := m.getUnmanagedNodegroupStackName()
klog.Infof("creating unmanaged nodegroup stack...")
userData, userDataIsMimePart, err := generateUserData(opts.UserDataFormat, cluster)
Expand Down Expand Up @@ -264,7 +329,7 @@ func (m *NodegroupManager) createUnmanagedNodegroup(infra *Infrastructure, clust
return nil
}

func (m *NodegroupManager) createUnmanagedNodegroupWithEFA(infra *Infrastructure, cluster *Cluster, opts *deployerOptions) error {
func (m *nodeManager) createUnmanagedNodegroupWithEFA(infra *Infrastructure, cluster *Cluster, opts *deployerOptions) error {
stackName := m.getUnmanagedNodegroupStackName()
klog.Infof("creating unmanaged nodegroup with EFA stack...")
userData, userDataIsMimePart, err := generateUserData(opts.UserDataFormat, cluster)
Expand Down Expand Up @@ -377,14 +442,14 @@ func (m *NodegroupManager) createUnmanagedNodegroupWithEFA(infra *Infrastructure
return nil
}

func (m *NodegroupManager) deleteNodegroup() error {
func (m *nodeManager) deleteNodes() error {
if err := m.deleteUnmanagedNodegroup(); err != nil {
return err
}
return m.deleteManagedNodegroup()
}

func (m *NodegroupManager) deleteManagedNodegroup() error {
func (m *nodeManager) deleteManagedNodegroup() error {
input := eks.DeleteNodegroupInput{
ClusterName: aws.String(m.resourceID),
NodegroupName: aws.String(m.resourceID),
Expand Down Expand Up @@ -412,7 +477,7 @@ func (m *NodegroupManager) deleteManagedNodegroup() error {
return nil
}

func (m *NodegroupManager) deleteUnmanagedNodegroup() error {
func (m *nodeManager) deleteUnmanagedNodegroup() error {
stackName := m.getUnmanagedNodegroupStackName()
input := cloudformation.DeleteStackInput{
StackName: aws.String(stackName),
Expand Down Expand Up @@ -441,11 +506,11 @@ func (m *NodegroupManager) deleteUnmanagedNodegroup() error {
return nil
}

func (m *NodegroupManager) getUnmanagedNodegroupStackName() string {
func (m *nodeManager) getUnmanagedNodegroupStackName() string {
return fmt.Sprintf("%s-unmanaged-nodegroup", m.resourceID)
}

func (m *NodegroupManager) verifyASGAMI(asgName string, amiId string) (bool, error) {
func (m *nodeManager) verifyASGAMI(asgName string, amiId string) (bool, error) {
klog.Infof("verifying AMI is %s for ASG: %s", amiId, asgName)
asgOut, err := m.clients.ASG().DescribeAutoScalingGroups(context.TODO(), &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: []string{asgName},
Expand Down Expand Up @@ -482,7 +547,7 @@ func (m *NodegroupManager) verifyASGAMI(asgName string, amiId string) (bool, err
return true, nil
}

func (m *NodegroupManager) getSubnetWithCapacity(infra *Infrastructure, opts *deployerOptions) (string, string, error) {
func (m *nodeManager) getSubnetWithCapacity(infra *Infrastructure, opts *deployerOptions) (string, string, error) {
var capacityReservationId string
capacityReservations, err := m.clients.EC2().DescribeCapacityReservations(context.TODO(), &ec2.DescribeCapacityReservationsInput{
Filters: []ec2types.Filter{
Expand Down Expand Up @@ -534,23 +599,23 @@ func (m *NodegroupManager) getSubnetWithCapacity(infra *Infrastructure, opts *de
return subnetId, capacityReservationId, nil
}

func (m *NodegroupManager) getValidDefaultInstanceTypesByEKSAMIType(amiType ekstypes.AMITypes) ([]string, error) {
func (m *nodeManager) getValidDefaultInstanceTypesByEKSAMIType(amiType ekstypes.AMITypes) ([]string, error) {
defaults, ok := defaultInstanceTypesByEKSAMITypes[amiType]
if !ok {
return nil, fmt.Errorf("no default instance types known for AmiType: %v", amiType)
}
return m.getValidInstanceTypesFromList(defaults)
}

func (m *NodegroupManager) getValidDefaultInstanceTypesByEC2Arch(arch ec2types.ArchitectureValues) ([]string, error) {
func (m *nodeManager) getValidDefaultInstanceTypesByEC2Arch(arch ec2types.ArchitectureValues) ([]string, error) {
defaults, ok := defaultInstanceTypesByEC2ArchitectureValues[arch]
if !ok {
return nil, fmt.Errorf("no default instance types known for AMI architecture: %v", arch)
}
return m.getValidInstanceTypesFromList(defaults)
}

func (m *NodegroupManager) getValidInstanceTypesFromList(desiredInstanceTypes []string) ([]string, error) {
func (m *nodeManager) getValidInstanceTypesFromList(desiredInstanceTypes []string) ([]string, error) {
var validInstanceTypes []string
for _, instanceType := range desiredInstanceTypes {
ec2InstanceType := ec2types.InstanceType(instanceType)
Expand Down

0 comments on commit c74a892

Please sign in to comment.