Skip to content

Commit

Permalink
fix: Ensure volumes are unpublished before deleting node
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSirenko committed Jul 5, 2024
1 parent e356a51 commit 51ea296
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 1 deletion.
26 changes: 26 additions & 0 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/metrics"
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
volumeattachmentutils "sigs.k8s.io/karpenter/pkg/utils/volumeattachment"
)

// Controller for the resource
Expand Down Expand Up @@ -107,6 +108,15 @@ func (c *Controller) finalize(ctx context.Context, node *v1.Node) (reconcile.Res
}
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
// In order for stateful pods to smoothly migrate from the terminating Node, we wait for VolumeAttachments
// of drain-able pods to be cleaned up before terminating the node and removing it from the cluster.
isNoVolumeAttachments, err := c.ensureNoVolumeAttachments(ctx, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err)
}
if !isNoVolumeAttachments {
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
nodeClaims, err := nodeutils.GetNodeClaims(ctx, node, c.kubeClient)
if err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
Expand Down Expand Up @@ -150,6 +160,22 @@ func (c *Controller) deleteAllNodeClaims(ctx context.Context, node *v1.Node) err
return nil
}

func (c *Controller) ensureNoVolumeAttachments(ctx context.Context, node *v1.Node) (bool, error) {
volumeAttachments, err := nodeutils.GetVolumeAttachments(ctx, c.kubeClient, node)
if err != nil {
return false, err
}
// Filter out volume attachments associated with non-drain-able nodes or multi-attachable volumes
filteredVolumeAttachments, err := volumeattachmentutils.FilterVolumeAttachments(ctx, c.kubeClient, node, volumeAttachments)
if err != nil {
return false, err
}
if len(filteredVolumeAttachments) > 0 {
return false, nil
}
return true, nil
}

func (c *Controller) removeFinalizer(ctx context.Context, n *v1.Node) error {
stored := n.DeepCopy()
controllerutil.RemoveFinalizer(n, v1beta1.TerminationFinalizer)
Expand Down
5 changes: 5 additions & 0 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"sync"
"time"

storagev1 "k8s.io/api/storage/v1"

"github.com/awslabs/operatorpkg/controller"
opmetrics "github.com/awslabs/operatorpkg/metrics"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -204,6 +206,9 @@ func NewOperator() (context.Context, *Operator) {
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1beta1.NodeClaim{}, "spec.nodeClassRef.name", func(o client.Object) []string {
return []string{o.(*v1beta1.NodeClaim).Spec.NodeClassRef.Name}
}), "failed to setup nodeclaim nodeclassref name indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &storagev1.VolumeAttachment{}, "spec.nodeName", func(o client.Object) []string {
return []string{o.(*storagev1.VolumeAttachment).Spec.NodeName}
}), "failed to setup volumeattachment indexer")

lo.Must0(mgr.AddReadyzCheck("manager", func(req *http.Request) error {
return lo.Ternary(mgr.GetCache().WaitForCacheSync(req.Context()), nil, fmt.Errorf("failed to sync caches"))
Expand Down
29 changes: 28 additions & 1 deletion pkg/utils/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
"context"
"fmt"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
storagev1 "k8s.io/api/storage/v1"

"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/utils/pod"
)

Expand Down Expand Up @@ -75,6 +76,32 @@ func GetProvisionablePods(ctx context.Context, kubeClient client.Client) ([]*v1.
}), nil
}

// GetTolerantPods grabs all pods from passed nodes that don't satisfy the ToleratesDisruptionNoScheduleTaint criteria
func GetTolerantPods(ctx context.Context, kubeClient client.Client, nodes ...*v1.Node) ([]*v1.Pod, error) {
pods, err := GetPods(ctx, kubeClient, nodes...)
if err != nil {
return nil, fmt.Errorf("listing pods, %w", err)
}
return lo.Reject(pods, func(p *v1.Pod, _ int) bool {
return pod.ToleratesDisruptionNoScheduleTaint(p)
}), nil
}

// GetVolumeAttachments grabs all volumeAttachments of passed node
func GetVolumeAttachments(ctx context.Context, kubeClient client.Client, node *v1.Node) ([]*storagev1.VolumeAttachment, error) {
var volumeAttachments []*storagev1.VolumeAttachment
var volumeAttachmentList storagev1.VolumeAttachmentList
if err := kubeClient.List(ctx, &volumeAttachmentList, client.MatchingFields{"spec.nodeName": node.Name}); err != nil {
return nil, fmt.Errorf("listing volumeattachments, %w", err)
}
for i := range volumeAttachmentList.Items {
if volumeAttachmentList.Items[i].Spec.NodeName == node.Name {
volumeAttachments = append(volumeAttachments, &volumeAttachmentList.Items[i])
}
}
return volumeAttachments, nil
}

func GetCondition(n *v1.Node, match v1.NodeConditionType) v1.NodeCondition {
for _, condition := range n.Status.Conditions {
if condition.Type == match {
Expand Down
74 changes: 74 additions & 0 deletions pkg/utils/volumeattachment/volumeattachment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package volumeattachment

import (
"context"
"fmt"

v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

nodeutil "sigs.k8s.io/karpenter/pkg/utils/node"
volumeutil "sigs.k8s.io/karpenter/pkg/utils/volume"
)

// FilterVolumeAttachments filters out volumeAttachments that should not block the termination of the passed node
func FilterVolumeAttachments(ctx context.Context, kubeClient client.Client, node *v1.Node, volumeAttachments []*storagev1.VolumeAttachment) ([]*storagev1.VolumeAttachment, error) {
var filteredVolumeAttachments []*storagev1.VolumeAttachment
// No need to filter empty volumeAttachments list
if len(volumeAttachments) == 0 {
return volumeAttachments, nil
}
// Filter out non-drain-able pods
tolerantPods, err := nodeutil.GetTolerantPods(ctx, kubeClient, node)
if err != nil {
return nil, fmt.Errorf("listing pods on node, %w", err)
}
// Filter out Multi-Attach volumes
shouldFilterOutVolume := make(map[string]bool)
for _, p := range tolerantPods {
for _, v := range p.Spec.Volumes {
pvc, err := volumeutil.GetPersistentVolumeClaim(ctx, kubeClient, p, v)
if err != nil {
return nil, fmt.Errorf("getting persistent volume claim, %w", err)
}
if pvc != nil {
shouldFilterOutVolume[pvc.Spec.VolumeName] = CannotMultiAttach(*pvc)
}
}
}
for i := range volumeAttachments {
pvName := volumeAttachments[i].Spec.Source.PersistentVolumeName
if pvName != nil && shouldFilterOutVolume[*pvName] {
filteredVolumeAttachments = append(filteredVolumeAttachments, volumeAttachments[i])
}
}
return filteredVolumeAttachments, nil
}

// CannotMultiAttach returns true if the persistentVolumeClaim's underlying volume cannot be attached to multiple nodes
// i.e. its access mode is not ReadWriteOnce/ReadWriteOncePod
func CannotMultiAttach(pvc v1.PersistentVolumeClaim) bool {
for _, accessMode := range pvc.Spec.AccessModes {
if accessMode == v1.ReadWriteOnce || accessMode == v1.ReadWriteOncePod {
return true
}
}
return false
}

0 comments on commit 51ea296

Please sign in to comment.