Skip to content

Commit

Permalink
fix: Ensure persistent volumes are detached before deleting node (kub…
Browse files Browse the repository at this point in the history
…ernetes-sigs#1294)

Co-authored-by: Jason Deal <jmdeal@amazon.com>
  • Loading branch information
2 people authored and BEvgeniyS committed Sep 16, 2024
1 parent f3962c5 commit 37ca1c2
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 15 deletions.
2 changes: 1 addition & 1 deletion kwok/charts/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ rules:
resources: ["pods", "nodes", "persistentvolumes", "persistentvolumeclaims", "replicationcontrollers", "namespaces"]
verbs: ["get", "list", "watch"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses", "csinodes"]
resources: ["storageclasses", "csinodes", "volumeattachments"]
verbs: ["get", "watch", "list"]
- apiGroups: ["apps"]
resources: ["daemonsets", "deployments", "replicasets", "statefulsets"]
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewControllers(
informer.NewPodController(kubeClient, cluster),
informer.NewNodePoolController(kubeClient, cluster),
informer.NewNodeClaimController(kubeClient, cluster),
termination.NewController(kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, recorder), recorder),
termination.NewController(clock, kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, recorder), recorder),
metricspod.NewController(kubeClient),
metricsnodepool.NewController(kubeClient),
metricsnode.NewController(cluster),
Expand Down
83 changes: 76 additions & 7 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/errors"

"sigs.k8s.io/karpenter/pkg/utils/termination"

"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -38,28 +39,32 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"sigs.k8s.io/karpenter/pkg/operator/injection"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator"
terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/operator/injection"
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
"sigs.k8s.io/karpenter/pkg/utils/pod"
"sigs.k8s.io/karpenter/pkg/utils/termination"
volumeutil "sigs.k8s.io/karpenter/pkg/utils/volume"
)

// Controller for the resource
type Controller struct {
clock clock.Clock
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
terminator *terminator.Terminator
recorder events.Recorder
}

// NewController constructs a controller instance
func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, terminator *terminator.Terminator, recorder events.Recorder) *Controller {
func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, terminator *terminator.Terminator, recorder events.Recorder) *Controller {
return &Controller{
clock: clk,
kubeClient: kubeClient,
cloudProvider: cloudProvider,
terminator: terminator,
Expand Down Expand Up @@ -119,6 +124,18 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile

return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
// In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait
// for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer.
// However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting.
if nodeTerminationTime == nil || c.clock.Now().Before(*nodeTerminationTime) {
areVolumesDetached, err := c.ensureVolumesDetached(ctx, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err)
}
if !areVolumesDetached {
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
}
nodeClaims, err = nodeutils.GetNodeClaims(ctx, node, c.kubeClient)
if err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
Expand Down Expand Up @@ -158,6 +175,58 @@ func (c *Controller) deleteAllNodeClaims(ctx context.Context, nodeClaims ...*v1.
return nil
}

func (c *Controller) ensureVolumesDetached(ctx context.Context, node *corev1.Node) (volumesDetached bool, err error) {
volumeAttachments, err := nodeutils.GetVolumeAttachments(ctx, c.kubeClient, node)
if err != nil {
return false, err
}
// Filter out VolumeAttachments associated with not drain-able Pods
filteredVolumeAttachments, err := filterVolumeAttachments(ctx, c.kubeClient, node, volumeAttachments, c.clock)
if err != nil {
return false, err
}
return len(filteredVolumeAttachments) == 0, nil
}

// filterVolumeAttachments filters out storagev1.VolumeAttachments that should not block the termination
// of the passed corev1.Node
func filterVolumeAttachments(ctx context.Context, kubeClient client.Client, node *corev1.Node, volumeAttachments []*storagev1.VolumeAttachment, clk clock.Clock) ([]*storagev1.VolumeAttachment, error) {
// No need to filter empty VolumeAttachments list
if len(volumeAttachments) == 0 {
return volumeAttachments, nil
}
// Create list of non-drain-able Pods associated with Node
pods, err := nodeutils.GetPods(ctx, kubeClient, node)
if err != nil {
return nil, err
}
unDrainablePods := lo.Reject(pods, func(p *corev1.Pod, _ int) bool {
return pod.IsDrainable(p, clk)
})
// Filter out VolumeAttachments associated with non-drain-able Pods
// Match on Pod -> PersistentVolumeClaim -> PersistentVolume Name <- VolumeAttachment
shouldFilterOutVolume := sets.New[string]()
for _, p := range unDrainablePods {
for _, v := range p.Spec.Volumes {
pvc, err := volumeutil.GetPersistentVolumeClaim(ctx, kubeClient, p, v)
if errors.IsNotFound(err) {
continue
}
if err != nil {
return nil, err
}
if pvc != nil {
shouldFilterOutVolume.Insert(pvc.Spec.VolumeName)
}
}
}
filteredVolumeAttachments := lo.Reject(volumeAttachments, func(v *storagev1.VolumeAttachment, _ int) bool {
pvName := v.Spec.Source.PersistentVolumeName
return pvName == nil || shouldFilterOutVolume.Has(*pvName)
})
return filteredVolumeAttachments, nil
}

func (c *Controller) removeFinalizer(ctx context.Context, n *corev1.Node) error {
stored := n.DeepCopy()
controllerutil.RemoveFinalizer(n, v1.TerminationFinalizer)
Expand Down
78 changes: 76 additions & 2 deletions pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ func TestAPIs(t *testing.T) {

var _ = BeforeSuite(func() {
fakeClock = clock.NewFakeClock(time.Now())
env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeClaimFieldIndexer(ctx)))
env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeClaimFieldIndexer(ctx), test.VolumeAttachmentFieldIndexer(ctx)))

cloudProvider = fake.NewCloudProvider()
recorder = test.NewEventRecorder()
queue = terminator.NewQueue(env.Client, recorder)
terminationController = termination.NewController(env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue, recorder), recorder)
terminationController = termination.NewController(fakeClock, env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue, recorder), recorder)
})

var _ = AfterSuite(func() {
Expand Down Expand Up @@ -763,6 +763,80 @@ var _ = Describe("Termination", func() {
ExpectSingletonReconciled(ctx, queue)
ExpectDeleted(ctx, env.Client, pod)
})
Context("VolumeAttachments", func() {
It("should wait for volume attachments", func() {
va := test.VolumeAttachment(test.VolumeAttachmentOptions{
NodeName: node.Name,
VolumeName: "foo",
})
ExpectApplied(ctx, env.Client, node, nodeClaim, nodePool, va)
Expect(env.Client.Delete(ctx, node)).To(Succeed())

ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectExists(ctx, env.Client, node)

ExpectDeleted(ctx, env.Client, va)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectNotFound(ctx, env.Client, node)
})
It("should only wait for volume attachments associated with drainable pods", func() {
vaDrainable := test.VolumeAttachment(test.VolumeAttachmentOptions{
NodeName: node.Name,
VolumeName: "foo",
})
vaNonDrainable := test.VolumeAttachment(test.VolumeAttachmentOptions{
NodeName: node.Name,
VolumeName: "bar",
})
pvc := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{
VolumeName: "bar",
})
pod := test.Pod(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
OwnerReferences: defaultOwnerRefs,
},
Tolerations: []corev1.Toleration{{
Key: v1.DisruptedTaintKey,
Operator: corev1.TolerationOpExists,
}},
PersistentVolumeClaims: []string{pvc.Name},
})
ExpectApplied(ctx, env.Client, node, nodeClaim, nodePool, vaDrainable, vaNonDrainable, pod, pvc)
ExpectManualBinding(ctx, env.Client, pod, node)
Expect(env.Client.Delete(ctx, node)).To(Succeed())

ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectExists(ctx, env.Client, node)

ExpectDeleted(ctx, env.Client, vaDrainable)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectNotFound(ctx, env.Client, node)
})
It("should wait for volume attachments until the nodeclaim's termination grace period expires", func() {
va := test.VolumeAttachment(test.VolumeAttachmentOptions{
NodeName: node.Name,
VolumeName: "foo",
})
nodeClaim.Annotations = map[string]string{
v1.NodeClaimTerminationTimestampAnnotationKey: fakeClock.Now().Add(time.Minute).Format(time.RFC3339),
}
ExpectApplied(ctx, env.Client, node, nodeClaim, nodePool, va)
Expect(env.Client.Delete(ctx, node)).To(Succeed())

ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectExists(ctx, env.Client, node)

fakeClock.Step(5 * time.Minute)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectNotFound(ctx, env.Client, node)
})
})
})
Context("Metrics", func() {
It("should fire the terminationSummary metric when deleting nodes", func() {
Expand Down
4 changes: 4 additions & 0 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/klog/v2"
"knative.dev/pkg/changeset"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -218,6 +219,9 @@ func NewOperator() (context.Context, *Operator) {
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodePool{}, "spec.template.spec.nodeClassRef.name", func(o client.Object) []string {
return []string{o.(*v1.NodePool).Spec.Template.Spec.NodeClassRef.Name}
}), "failed to setup nodepool nodeclassref name indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &storagev1.VolumeAttachment{}, "spec.nodeName", func(o client.Object) []string {
return []string{o.(*storagev1.VolumeAttachment).Spec.NodeName}
}), "failed to setup volumeattachment indexer")

lo.Must0(mgr.AddHealthzCheck("healthz", healthz.Ping))
lo.Must0(mgr.AddReadyzCheck("readyz", healthz.Ping))
Expand Down
9 changes: 9 additions & 0 deletions pkg/test/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/awslabs/operatorpkg/option"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -75,6 +76,14 @@ func NodeClaimFieldIndexer(ctx context.Context) func(cache.Cache) error {
}
}

func VolumeAttachmentFieldIndexer(ctx context.Context) func(cache.Cache) error {
return func(c cache.Cache) error {
return c.IndexField(ctx, &storagev1.VolumeAttachment{}, "spec.nodeName", func(obj client.Object) []string {
return []string{obj.(*storagev1.VolumeAttachment).Spec.NodeName}
})
}
}

func NewEnvironment(options ...option.Function[EnvironmentOptions]) *Environment {
opts := option.Resolve(options...)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
25 changes: 25 additions & 0 deletions pkg/test/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,28 @@ func StorageClass(overrides ...StorageClassOptions) *storagev1.StorageClass {
VolumeBindingMode: options.VolumeBindingMode,
}
}

type VolumeAttachmentOptions struct {
metav1.ObjectMeta
NodeName string
VolumeName string
}

func VolumeAttachment(overrides ...VolumeAttachmentOptions) *storagev1.VolumeAttachment {
options := VolumeAttachmentOptions{}
for _, opts := range overrides {
if err := mergo.Merge(&options, opts, mergo.WithOverride); err != nil {
panic(fmt.Sprintf("Failed to merge options: %s", err))
}
}
return &storagev1.VolumeAttachment{
ObjectMeta: ObjectMeta(options.ObjectMeta),
Spec: storagev1.VolumeAttachmentSpec{
NodeName: options.NodeName,
Attacher: "fake-csi",
Source: storagev1.VolumeAttachmentSource{
PersistentVolumeName: lo.ToPtr(options.VolumeName),
},
},
}
}
10 changes: 10 additions & 0 deletions pkg/utils/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"

corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"

Expand Down Expand Up @@ -143,6 +144,15 @@ func GetProvisionablePods(ctx context.Context, kubeClient client.Client) ([]*cor
}), nil
}

// GetVolumeAttachments grabs all volumeAttachments associated with the passed node
func GetVolumeAttachments(ctx context.Context, kubeClient client.Client, node *corev1.Node) ([]*storagev1.VolumeAttachment, error) {
var volumeAttachmentList storagev1.VolumeAttachmentList
if err := kubeClient.List(ctx, &volumeAttachmentList, client.MatchingFields{"spec.nodeName": node.Name}); err != nil {
return nil, fmt.Errorf("listing volumeAttachments, %w", err)
}
return lo.ToSlicePtr(volumeAttachmentList.Items), nil
}

func GetCondition(n *corev1.Node, match corev1.NodeConditionType) corev1.NodeCondition {
for _, condition := range n.Status.Conditions {
if condition.Type == match {
Expand Down
14 changes: 10 additions & 4 deletions pkg/utils/pod/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,20 @@ func IsEvictable(pod *corev1.Pod) bool {

// IsWaitingEviction checks if this is a pod that we are waiting to be removed from the node by ensuring that the pod:
// - Isn't a terminal pod (Failed or Succeeded)
// - Isn't a pod that has been terminating past its terminationGracePeriodSeconds
// - Can be drained by Karpenter (See IsDrainable)
func IsWaitingEviction(pod *corev1.Pod, clk clock.Clock) bool {
return !IsTerminal(pod) &&
IsDrainable(pod, clk)
}

// IsDrainable checks if a pod can be drained by Karpenter by ensuring that the pod:
// - Doesn't tolerate the "karpenter.sh/disruption=disrupting" taint
// - Isn't a pod that has been terminating past its terminationGracePeriodSeconds
// - Isn't a mirror pod (https://kubernetes.io/docs/tasks/configure-pod-container/static-pod/)
// Note: pods with the `karpenter.sh/do-not-disrupt` annotation are included since node drain should stall until these pods are evicted or become terminal, even though Karpenter won't orchestrate the eviction.
func IsWaitingEviction(pod *corev1.Pod, clk clock.Clock) bool {
return !IsTerminal(pod) &&
func IsDrainable(pod *corev1.Pod, clk clock.Clock) bool {
return !ToleratesDisruptedNoScheduleTaint(pod) &&
!IsStuckTerminating(pod, clk) &&
!ToleratesDisruptedNoScheduleTaint(pod) &&
// Mirror pods cannot be deleted through the API server since they are created and managed by kubelet
// This means they are effectively read-only and can't be controlled by API server calls
// https://kubernetes.io/docs/reference/generated/kubectl/kubectl-commands#drain
Expand Down

0 comments on commit 37ca1c2

Please sign in to comment.