From 37ca1c2f71b0c1bb644553c67e9611b0fad7d182 Mon Sep 17 00:00:00 2001 From: Drew Sirenko <68304519+AndrewSirenko@users.noreply.github.com> Date: Fri, 2 Aug 2024 15:32:21 -0400 Subject: [PATCH] fix: Ensure persistent volumes are detached before deleting node (#1294) Co-authored-by: Jason Deal --- kwok/charts/templates/clusterrole.yaml | 2 +- pkg/controllers/controllers.go | 2 +- .../node/termination/controller.go | 83 +++++++++++++++++-- .../node/termination/suite_test.go | 78 ++++++++++++++++- pkg/operator/operator.go | 4 + pkg/test/environment.go | 9 ++ pkg/test/storage.go | 25 ++++++ pkg/utils/node/node.go | 10 +++ pkg/utils/pod/scheduling.go | 14 +++- 9 files changed, 212 insertions(+), 15 deletions(-) diff --git a/kwok/charts/templates/clusterrole.yaml b/kwok/charts/templates/clusterrole.yaml index 199951021b..5e1caa370a 100644 --- a/kwok/charts/templates/clusterrole.yaml +++ b/kwok/charts/templates/clusterrole.yaml @@ -36,7 +36,7 @@ rules: resources: ["pods", "nodes", "persistentvolumes", "persistentvolumeclaims", "replicationcontrollers", "namespaces"] verbs: ["get", "list", "watch"] - apiGroups: ["storage.k8s.io"] - resources: ["storageclasses", "csinodes"] + resources: ["storageclasses", "csinodes", "volumeattachments"] verbs: ["get", "watch", "list"] - apiGroups: ["apps"] resources: ["daemonsets", "deployments", "replicasets", "statefulsets"] diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 65956d7b5e..efbe349530 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -83,7 +83,7 @@ func NewControllers( informer.NewPodController(kubeClient, cluster), informer.NewNodePoolController(kubeClient, cluster), informer.NewNodeClaimController(kubeClient, cluster), - termination.NewController(kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, recorder), recorder), + termination.NewController(clock, kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, recorder), recorder), metricspod.NewController(kubeClient), metricsnodepool.NewController(kubeClient), metricsnode.NewController(cluster), diff --git a/pkg/controllers/node/termination/controller.go b/pkg/controllers/node/termination/controller.go index d217893edf..8f7ce15914 100644 --- a/pkg/controllers/node/termination/controller.go +++ b/pkg/controllers/node/termination/controller.go @@ -21,15 +21,16 @@ import ( "fmt" "time" - "k8s.io/apimachinery/pkg/api/errors" - - "sigs.k8s.io/karpenter/pkg/utils/termination" - "github.com/prometheus/client_golang/prometheus" + "github.com/samber/lo" "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/clock" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -38,19 +39,22 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/karpenter/pkg/operator/injection" - v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator" terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events" "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/metrics" + "sigs.k8s.io/karpenter/pkg/operator/injection" nodeutils "sigs.k8s.io/karpenter/pkg/utils/node" + "sigs.k8s.io/karpenter/pkg/utils/pod" + "sigs.k8s.io/karpenter/pkg/utils/termination" + volumeutil "sigs.k8s.io/karpenter/pkg/utils/volume" ) // Controller for the resource type Controller struct { + clock clock.Clock kubeClient client.Client cloudProvider cloudprovider.CloudProvider terminator *terminator.Terminator @@ -58,8 +62,9 @@ type Controller struct { } // NewController constructs a controller instance -func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, terminator *terminator.Terminator, recorder events.Recorder) *Controller { +func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, terminator *terminator.Terminator, recorder events.Recorder) *Controller { return &Controller{ + clock: clk, kubeClient: kubeClient, cloudProvider: cloudProvider, terminator: terminator, @@ -119,6 +124,18 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile return reconcile.Result{RequeueAfter: 1 * time.Second}, nil } + // In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait + // for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer. + // However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting. + if nodeTerminationTime == nil || c.clock.Now().Before(*nodeTerminationTime) { + areVolumesDetached, err := c.ensureVolumesDetached(ctx, node) + if err != nil { + return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err) + } + if !areVolumesDetached { + return reconcile.Result{RequeueAfter: 1 * time.Second}, nil + } + } nodeClaims, err = nodeutils.GetNodeClaims(ctx, node, c.kubeClient) if err != nil { return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err) @@ -158,6 +175,58 @@ func (c *Controller) deleteAllNodeClaims(ctx context.Context, nodeClaims ...*v1. return nil } +func (c *Controller) ensureVolumesDetached(ctx context.Context, node *corev1.Node) (volumesDetached bool, err error) { + volumeAttachments, err := nodeutils.GetVolumeAttachments(ctx, c.kubeClient, node) + if err != nil { + return false, err + } + // Filter out VolumeAttachments associated with not drain-able Pods + filteredVolumeAttachments, err := filterVolumeAttachments(ctx, c.kubeClient, node, volumeAttachments, c.clock) + if err != nil { + return false, err + } + return len(filteredVolumeAttachments) == 0, nil +} + +// filterVolumeAttachments filters out storagev1.VolumeAttachments that should not block the termination +// of the passed corev1.Node +func filterVolumeAttachments(ctx context.Context, kubeClient client.Client, node *corev1.Node, volumeAttachments []*storagev1.VolumeAttachment, clk clock.Clock) ([]*storagev1.VolumeAttachment, error) { + // No need to filter empty VolumeAttachments list + if len(volumeAttachments) == 0 { + return volumeAttachments, nil + } + // Create list of non-drain-able Pods associated with Node + pods, err := nodeutils.GetPods(ctx, kubeClient, node) + if err != nil { + return nil, err + } + unDrainablePods := lo.Reject(pods, func(p *corev1.Pod, _ int) bool { + return pod.IsDrainable(p, clk) + }) + // Filter out VolumeAttachments associated with non-drain-able Pods + // Match on Pod -> PersistentVolumeClaim -> PersistentVolume Name <- VolumeAttachment + shouldFilterOutVolume := sets.New[string]() + for _, p := range unDrainablePods { + for _, v := range p.Spec.Volumes { + pvc, err := volumeutil.GetPersistentVolumeClaim(ctx, kubeClient, p, v) + if errors.IsNotFound(err) { + continue + } + if err != nil { + return nil, err + } + if pvc != nil { + shouldFilterOutVolume.Insert(pvc.Spec.VolumeName) + } + } + } + filteredVolumeAttachments := lo.Reject(volumeAttachments, func(v *storagev1.VolumeAttachment, _ int) bool { + pvName := v.Spec.Source.PersistentVolumeName + return pvName == nil || shouldFilterOutVolume.Has(*pvName) + }) + return filteredVolumeAttachments, nil +} + func (c *Controller) removeFinalizer(ctx context.Context, n *corev1.Node) error { stored := n.DeepCopy() controllerutil.RemoveFinalizer(n, v1.TerminationFinalizer) diff --git a/pkg/controllers/node/termination/suite_test.go b/pkg/controllers/node/termination/suite_test.go index d2db04648c..12b63a7a1c 100644 --- a/pkg/controllers/node/termination/suite_test.go +++ b/pkg/controllers/node/termination/suite_test.go @@ -64,12 +64,12 @@ func TestAPIs(t *testing.T) { var _ = BeforeSuite(func() { fakeClock = clock.NewFakeClock(time.Now()) - env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeClaimFieldIndexer(ctx))) + env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeClaimFieldIndexer(ctx), test.VolumeAttachmentFieldIndexer(ctx))) cloudProvider = fake.NewCloudProvider() recorder = test.NewEventRecorder() queue = terminator.NewQueue(env.Client, recorder) - terminationController = termination.NewController(env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue, recorder), recorder) + terminationController = termination.NewController(fakeClock, env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue, recorder), recorder) }) var _ = AfterSuite(func() { @@ -763,6 +763,80 @@ var _ = Describe("Termination", func() { ExpectSingletonReconciled(ctx, queue) ExpectDeleted(ctx, env.Client, pod) }) + Context("VolumeAttachments", func() { + It("should wait for volume attachments", func() { + va := test.VolumeAttachment(test.VolumeAttachmentOptions{ + NodeName: node.Name, + VolumeName: "foo", + }) + ExpectApplied(ctx, env.Client, node, nodeClaim, nodePool, va) + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + + ExpectObjectReconciled(ctx, env.Client, terminationController, node) + ExpectObjectReconciled(ctx, env.Client, terminationController, node) + ExpectExists(ctx, env.Client, node) + + ExpectDeleted(ctx, env.Client, va) + ExpectObjectReconciled(ctx, env.Client, terminationController, node) + ExpectObjectReconciled(ctx, env.Client, terminationController, node) + ExpectNotFound(ctx, env.Client, node) + }) + It("should only wait for volume attachments associated with drainable pods", func() { + vaDrainable := test.VolumeAttachment(test.VolumeAttachmentOptions{ + NodeName: node.Name, + VolumeName: "foo", + }) + vaNonDrainable := test.VolumeAttachment(test.VolumeAttachmentOptions{ + NodeName: node.Name, + VolumeName: "bar", + }) + pvc := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{ + VolumeName: "bar", + }) + pod := test.Pod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: defaultOwnerRefs, + }, + Tolerations: []corev1.Toleration{{ + Key: v1.DisruptedTaintKey, + Operator: corev1.TolerationOpExists, + }}, + PersistentVolumeClaims: []string{pvc.Name}, + }) + ExpectApplied(ctx, env.Client, node, nodeClaim, nodePool, vaDrainable, vaNonDrainable, pod, pvc) + ExpectManualBinding(ctx, env.Client, pod, node) + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + + ExpectObjectReconciled(ctx, env.Client, terminationController, node) + ExpectObjectReconciled(ctx, env.Client, terminationController, node) + ExpectExists(ctx, env.Client, node) + + ExpectDeleted(ctx, env.Client, vaDrainable) + ExpectObjectReconciled(ctx, env.Client, terminationController, node) + ExpectObjectReconciled(ctx, env.Client, terminationController, node) + ExpectNotFound(ctx, env.Client, node) + }) + It("should wait for volume attachments until the nodeclaim's termination grace period expires", func() { + va := test.VolumeAttachment(test.VolumeAttachmentOptions{ + NodeName: node.Name, + VolumeName: "foo", + }) + nodeClaim.Annotations = map[string]string{ + v1.NodeClaimTerminationTimestampAnnotationKey: fakeClock.Now().Add(time.Minute).Format(time.RFC3339), + } + ExpectApplied(ctx, env.Client, node, nodeClaim, nodePool, va) + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + + ExpectObjectReconciled(ctx, env.Client, terminationController, node) + ExpectObjectReconciled(ctx, env.Client, terminationController, node) + ExpectExists(ctx, env.Client, node) + + fakeClock.Step(5 * time.Minute) + ExpectObjectReconciled(ctx, env.Client, terminationController, node) + ExpectObjectReconciled(ctx, env.Client, terminationController, node) + ExpectNotFound(ctx, env.Client, node) + }) + }) }) Context("Metrics", func() { It("should fire the terminationSummary metric when deleting nodes", func() { diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index a036ee4528..e238a3d200 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -35,6 +35,7 @@ import ( "github.com/prometheus/client_golang/prometheus" coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" "k8s.io/klog/v2" "knative.dev/pkg/changeset" ctrl "sigs.k8s.io/controller-runtime" @@ -218,6 +219,9 @@ func NewOperator() (context.Context, *Operator) { lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodePool{}, "spec.template.spec.nodeClassRef.name", func(o client.Object) []string { return []string{o.(*v1.NodePool).Spec.Template.Spec.NodeClassRef.Name} }), "failed to setup nodepool nodeclassref name indexer") + lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &storagev1.VolumeAttachment{}, "spec.nodeName", func(o client.Object) []string { + return []string{o.(*storagev1.VolumeAttachment).Spec.NodeName} + }), "failed to setup volumeattachment indexer") lo.Must0(mgr.AddHealthzCheck("healthz", healthz.Ping)) lo.Must0(mgr.AddReadyzCheck("readyz", healthz.Ping)) diff --git a/pkg/test/environment.go b/pkg/test/environment.go index 715e92e8ce..f48f8ccbf2 100644 --- a/pkg/test/environment.go +++ b/pkg/test/environment.go @@ -25,6 +25,7 @@ import ( "github.com/awslabs/operatorpkg/option" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/util/version" "k8s.io/client-go/kubernetes" @@ -75,6 +76,14 @@ func NodeClaimFieldIndexer(ctx context.Context) func(cache.Cache) error { } } +func VolumeAttachmentFieldIndexer(ctx context.Context) func(cache.Cache) error { + return func(c cache.Cache) error { + return c.IndexField(ctx, &storagev1.VolumeAttachment{}, "spec.nodeName", func(obj client.Object) []string { + return []string{obj.(*storagev1.VolumeAttachment).Spec.NodeName} + }) + } +} + func NewEnvironment(options ...option.Function[EnvironmentOptions]) *Environment { opts := option.Resolve(options...) ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/test/storage.go b/pkg/test/storage.go index ab0dce5ef6..96fe7c41ce 100644 --- a/pkg/test/storage.go +++ b/pkg/test/storage.go @@ -153,3 +153,28 @@ func StorageClass(overrides ...StorageClassOptions) *storagev1.StorageClass { VolumeBindingMode: options.VolumeBindingMode, } } + +type VolumeAttachmentOptions struct { + metav1.ObjectMeta + NodeName string + VolumeName string +} + +func VolumeAttachment(overrides ...VolumeAttachmentOptions) *storagev1.VolumeAttachment { + options := VolumeAttachmentOptions{} + for _, opts := range overrides { + if err := mergo.Merge(&options, opts, mergo.WithOverride); err != nil { + panic(fmt.Sprintf("Failed to merge options: %s", err)) + } + } + return &storagev1.VolumeAttachment{ + ObjectMeta: ObjectMeta(options.ObjectMeta), + Spec: storagev1.VolumeAttachmentSpec{ + NodeName: options.NodeName, + Attacher: "fake-csi", + Source: storagev1.VolumeAttachmentSource{ + PersistentVolumeName: lo.ToPtr(options.VolumeName), + }, + }, + } +} diff --git a/pkg/utils/node/node.go b/pkg/utils/node/node.go index 34b8d97086..5f8428bd72 100644 --- a/pkg/utils/node/node.go +++ b/pkg/utils/node/node.go @@ -22,6 +22,7 @@ import ( "fmt" corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" @@ -143,6 +144,15 @@ func GetProvisionablePods(ctx context.Context, kubeClient client.Client) ([]*cor }), nil } +// GetVolumeAttachments grabs all volumeAttachments associated with the passed node +func GetVolumeAttachments(ctx context.Context, kubeClient client.Client, node *corev1.Node) ([]*storagev1.VolumeAttachment, error) { + var volumeAttachmentList storagev1.VolumeAttachmentList + if err := kubeClient.List(ctx, &volumeAttachmentList, client.MatchingFields{"spec.nodeName": node.Name}); err != nil { + return nil, fmt.Errorf("listing volumeAttachments, %w", err) + } + return lo.ToSlicePtr(volumeAttachmentList.Items), nil +} + func GetCondition(n *corev1.Node, match corev1.NodeConditionType) corev1.NodeCondition { for _, condition := range n.Status.Conditions { if condition.Type == match { diff --git a/pkg/utils/pod/scheduling.go b/pkg/utils/pod/scheduling.go index a092185ca2..139df8e634 100644 --- a/pkg/utils/pod/scheduling.go +++ b/pkg/utils/pod/scheduling.go @@ -62,14 +62,20 @@ func IsEvictable(pod *corev1.Pod) bool { // IsWaitingEviction checks if this is a pod that we are waiting to be removed from the node by ensuring that the pod: // - Isn't a terminal pod (Failed or Succeeded) -// - Isn't a pod that has been terminating past its terminationGracePeriodSeconds +// - Can be drained by Karpenter (See IsDrainable) +func IsWaitingEviction(pod *corev1.Pod, clk clock.Clock) bool { + return !IsTerminal(pod) && + IsDrainable(pod, clk) +} + +// IsDrainable checks if a pod can be drained by Karpenter by ensuring that the pod: // - Doesn't tolerate the "karpenter.sh/disruption=disrupting" taint +// - Isn't a pod that has been terminating past its terminationGracePeriodSeconds // - Isn't a mirror pod (https://kubernetes.io/docs/tasks/configure-pod-container/static-pod/) // Note: pods with the `karpenter.sh/do-not-disrupt` annotation are included since node drain should stall until these pods are evicted or become terminal, even though Karpenter won't orchestrate the eviction. -func IsWaitingEviction(pod *corev1.Pod, clk clock.Clock) bool { - return !IsTerminal(pod) && +func IsDrainable(pod *corev1.Pod, clk clock.Clock) bool { + return !ToleratesDisruptedNoScheduleTaint(pod) && !IsStuckTerminating(pod, clk) && - !ToleratesDisruptedNoScheduleTaint(pod) && // Mirror pods cannot be deleted through the API server since they are created and managed by kubelet // This means they are effectively read-only and can't be controlled by API server calls // https://kubernetes.io/docs/reference/generated/kubectl/kubectl-commands#drain