Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose nodeclaim disruption through new disruption condition, improves pod eviction event message #1370

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/apis/v1/nodeclaim_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
ConditionTypeDrifted = "Drifted"
ConditionTypeInstanceTerminating = "InstanceTerminating"
ConditionTypeConsistentStateFound = "ConsistentStateFound"
ConditionTypeDisruptionReason = "DisruptionReason"
)

// NodeClaimStatus defines the observed state of NodeClaim
Expand Down
59 changes: 42 additions & 17 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/awslabs/operatorpkg/singleton"
"github.com/samber/lo"
"go.uber.org/multierr"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/utils/clock"
Expand All @@ -36,9 +37,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

nodepoolutils "sigs.k8s.io/karpenter/pkg/utils/nodepool"
"sigs.k8s.io/karpenter/pkg/utils/pretty"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration"
Expand All @@ -49,6 +47,8 @@ import (
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/operator/injection"
operatorlogging "sigs.k8s.io/karpenter/pkg/operator/logging"
nodepoolutils "sigs.k8s.io/karpenter/pkg/utils/nodepool"
"sigs.k8s.io/karpenter/pkg/utils/pretty"
)

type Controller struct {
Expand Down Expand Up @@ -124,14 +124,21 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
// Karpenter taints nodes with a karpenter.sh/disruption taint as part of the disruption process while it progresses in memory.
// If Karpenter restarts or fails with an error during a disruption action, some nodes can be left tainted.
// Idempotently remove this taint from candidates that are not in the orchestration queue before continuing.
if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, false, lo.Filter(c.cluster.Nodes(), func(s *state.StateNode, _ int) bool {
return !c.queue.HasAny(s.ProviderID())
})...); err != nil {
outdatedNodes := lo.Filter(c.cluster.Nodes(), func(s *state.StateNode, _ int) bool {
return !c.queue.HasAny(s.ProviderID()) && !s.Deleted()
})
if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, false, outdatedNodes...); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("removing taint %s from nodes, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err)
}
if err := state.ClearNodeClaimsCondition(ctx, c.kubeClient, v1.ConditionTypeDisruptionReason, outdatedNodes...); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("removing %s condition from nodeclaims, %w", v1.ConditionTypeDisruptionReason, err)
}

// Attempt different disruption methods. We'll only let one method perform an action
for _, m := range c.methods {
Expand Down Expand Up @@ -197,12 +204,9 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
commandID := uuid.NewUUID()
log.FromContext(ctx).WithValues("command-id", commandID, "reason", strings.ToLower(string(m.Reason()))).Info(fmt.Sprintf("disrupting nodeclaim(s) via %s", cmd))

stateNodes := lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode {
return c.StateNode
})
// Cordon the old nodes before we launch the replacements to prevent new pods from scheduling to the old nodes
if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, true, stateNodes...); err != nil {
return fmt.Errorf("tainting nodes with %s (command-id: %s), %w", pretty.Taint(v1.DisruptedNoScheduleTaint), commandID, err)
if err := c.MarkDisrupted(ctx, m, cmd.candidates...); err != nil {
return fmt.Errorf("marking disrupted (command-id: %s), %w", commandID, err)
cnmcavoy marked this conversation as resolved.
Show resolved Hide resolved
}

var nodeClaimNames []string
Expand All @@ -226,12 +230,9 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
// the node is cleaned up.
schedulingResults.Record(log.IntoContext(ctx, operatorlogging.NopLogger), c.recorder, c.cluster)

providerIDs := lo.Map(cmd.candidates, func(c *Candidate, _ int) string { return c.ProviderID() })
// We have the new NodeClaims created at the API server so mark the old NodeClaims for deletion
c.cluster.MarkForDeletion(providerIDs...)

if err = c.queue.Add(orchestration.NewCommand(nodeClaimNames,
lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode { return c.StateNode }), commandID, m.Reason(), m.ConsolidationType())); err != nil {
statenodes := lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode { return c.StateNode })
if err := c.queue.Add(orchestration.NewCommand(nodeClaimNames, statenodes, commandID, m.Reason(), m.ConsolidationType())); err != nil {
providerIDs := lo.Map(cmd.candidates, func(c *Candidate, _ int) string { return c.ProviderID() })
c.cluster.UnmarkForDeletion(providerIDs...)
return fmt.Errorf("adding command to queue (command-id: %s), %w", commandID, err)
}
Expand All @@ -258,6 +259,30 @@ func (c *Controller) createReplacementNodeClaims(ctx context.Context, m Method,
return nodeClaimNames, nil
}

func (c *Controller) MarkDisrupted(ctx context.Context, m Method, candidates ...*Candidate) error {
stateNodes := lo.Map(candidates, func(c *Candidate, _ int) *state.StateNode {
return c.StateNode
})
if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, true, stateNodes...); err != nil {
return fmt.Errorf("tainting nodes with %s: %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err)
}

providerIDs := lo.Map(candidates, func(c *Candidate, _ int) string { return c.ProviderID() })
c.cluster.MarkForDeletion(providerIDs...)

return multierr.Combine(lo.Map(candidates, func(candidate *Candidate, _ int) error {
// refresh nodeclaim before updating status
nodeClaim := &v1.NodeClaim{}

if err := c.kubeClient.Get(ctx, client.ObjectKeyFromObject(candidate.NodeClaim), nodeClaim); err != nil {
return client.IgnoreNotFound(err)
}
stored := nodeClaim.DeepCopy()
nodeClaim.StatusConditions().SetTrueWithReason(v1.ConditionTypeDisruptionReason, v1.ConditionTypeDisruptionReason, string(m.Reason()))
return client.IgnoreNotFound(c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFrom(stored)))
})...)
}

func (c *Controller) recordRun(s string) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/disruption/orchestration/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) {
consolidationTypeLabel: cmd.consolidationType,
})
multiErr := multierr.Combine(err, cmd.lastError, state.RequireNoScheduleTaint(ctx, q.kubeClient, false, cmd.candidates...))
multiErr = multierr.Combine(multiErr, state.ClearNodeClaimsCondition(ctx, q.kubeClient, v1.ConditionTypeDisruptionReason, cmd.candidates...))
// Log the error
log.FromContext(ctx).WithValues("nodes", strings.Join(lo.Map(cmd.candidates, func(s *state.StateNode, _ int) string {
return s.Name()
Expand Down
19 changes: 19 additions & 0 deletions pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ var _ = Describe("Disruption Taints", func() {
})
nodePool.Spec.Disruption.ConsolidateAfter = v1.MustParseNillableDuration("Never")
node.Spec.Taints = append(node.Spec.Taints, v1.DisruptedNoScheduleTaint)
nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDisruptionReason)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node, pod)
ExpectManualBinding(ctx, env.Client, pod, node)

Expand All @@ -542,6 +543,12 @@ var _ = Describe("Disruption Taints", func() {
ExpectSingletonReconciled(ctx, disruptionController)
node = ExpectNodeExists(ctx, env.Client, node.Name)
Expect(node.Spec.Taints).ToNot(ContainElement(v1.DisruptedNoScheduleTaint))

nodeClaims := lo.Filter(ExpectNodeClaims(ctx, env.Client), func(nc *v1.NodeClaim, _ int) bool {
return nc.Status.ProviderID == node.Spec.ProviderID
})
Expect(nodeClaims).To(HaveLen(1))
Expect(nodeClaims[0].StatusConditions().Get(v1.ConditionTypeDisruptionReason)).To(BeNil())
})
It("should add and remove taints from NodeClaims that fail to disrupt", func() {
nodePool.Spec.Disruption.ConsolidationPolicy = v1.ConsolidationPolicyWhenEmptyOrUnderutilized
Expand Down Expand Up @@ -579,6 +586,12 @@ var _ = Describe("Disruption Taints", func() {

node = ExpectNodeExists(ctx, env.Client, node.Name)
Expect(node.Spec.Taints).To(ContainElement(v1.DisruptedNoScheduleTaint))
nodeClaims := lo.Filter(ExpectNodeClaims(ctx, env.Client), func(nc *v1.NodeClaim, _ int) bool {
return nc.Status.ProviderID == node.Spec.ProviderID
})
Expect(nodeClaims).To(HaveLen(1))
Expect(nodeClaims[0].StatusConditions().Get(v1.ConditionTypeDisruptionReason)).ToNot(BeNil())
Expect(nodeClaims[0].StatusConditions().Get(v1.ConditionTypeDisruptionReason).IsTrue()).To(BeTrue())

createdNodeClaim := lo.Reject(ExpectNodeClaims(ctx, env.Client), func(nc *v1.NodeClaim, _ int) bool {
return nc.Name == nodeClaim.Name
Expand All @@ -595,6 +608,12 @@ var _ = Describe("Disruption Taints", func() {

node = ExpectNodeExists(ctx, env.Client, node.Name)
Expect(node.Spec.Taints).ToNot(ContainElement(v1.DisruptedNoScheduleTaint))

nodeClaims = lo.Filter(ExpectNodeClaims(ctx, env.Client), func(nc *v1.NodeClaim, _ int) bool {
return nc.Status.ProviderID == node.Spec.ProviderID
})
Expect(nodeClaims).To(HaveLen(1))
Expect(nodeClaims[0].StatusConditions().Get(v1.ConditionTypeDisruptionReason)).To(BeNil())
})
})

Expand Down
14 changes: 7 additions & 7 deletions pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ var _ = Describe("Termination", func() {
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
Expect(queue.Has(podSkip)).To(BeFalse())
Expect(queue.Has(node, podSkip)).To(BeFalse())
ExpectSingletonReconciled(ctx, queue)

// Expect node to exist and be draining
Expand Down Expand Up @@ -233,7 +233,7 @@ var _ = Describe("Termination", func() {
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
Expect(queue.Has(podSkip)).To(BeFalse())
Expect(queue.Has(node, podSkip)).To(BeFalse())
ExpectSingletonReconciled(ctx, queue)

// Expect node to exist and be draining
Expand All @@ -243,7 +243,7 @@ var _ = Describe("Termination", func() {
EventuallyExpectTerminating(ctx, env.Client, podEvict)
ExpectDeleted(ctx, env.Client, podEvict)

Expect(queue.Has(podSkip)).To(BeFalse())
Expect(queue.Has(node, podSkip)).To(BeFalse())

// Reconcile to delete node
node = ExpectNodeExists(ctx, env.Client, node.Name)
Expand Down Expand Up @@ -357,13 +357,13 @@ var _ = Describe("Termination", func() {
ExpectNodeWithNodeClaimDraining(env.Client, node.Name)

// Expect podNoEvict to be added to the queue
Expect(queue.Has(podNoEvict)).To(BeTrue())
Expect(queue.Has(node, podNoEvict)).To(BeTrue())

// Attempt to evict the pod, but fail to do so
ExpectSingletonReconciled(ctx, queue)

// Expect podNoEvict to fail eviction due to PDB, and be retried
Expect(queue.Has(podNoEvict)).To(BeTrue())
Expect(queue.Has(node, podNoEvict)).To(BeTrue())

// Delete pod to simulate successful eviction
ExpectDeleted(ctx, env.Client, podNoEvict)
Expand Down Expand Up @@ -507,7 +507,7 @@ var _ = Describe("Termination", func() {
ExpectSingletonReconciled(ctx, queue)

// Expect mirror pod to not be queued for eviction
Expect(queue.Has(podNoEvict)).To(BeFalse())
Expect(queue.Has(node, podNoEvict)).To(BeFalse())

// Expect podEvict to be enqueued for eviction then be successful
EventuallyExpectTerminating(ctx, env.Client, podEvict)
Expand Down Expand Up @@ -681,7 +681,7 @@ var _ = Describe("Termination", func() {
ExpectObjectReconciled(ctx, env.Client, terminationController, node)

// Expect that the old pod's key still exists in the queue
Expect(queue.Has(pod)).To(BeTrue())
Expect(queue.Has(node, pod)).To(BeTrue())

// Re-create the pod and node, it should now have the same name, but a different UUID
node = test.Node(test.NodeOptions{
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/node/termination/terminator/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
"sigs.k8s.io/karpenter/pkg/events"
)

func EvictPod(pod *corev1.Pod) events.Event {
func EvictPod(pod *corev1.Pod, message string) events.Event {
return events.Event{
InvolvedObject: pod,
Type: corev1.EventTypeNormal,
Reason: "Evicted",
Message: "Evicted pod",
Message: "Evicted pod: " + message,
DedupeValues: []string{pod.Name},
}
}
Expand Down
35 changes: 28 additions & 7 deletions pkg/controllers/node/termination/terminator/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/operator/injection"
"sigs.k8s.io/karpenter/pkg/utils/node"
)

const (
Expand All @@ -68,13 +70,15 @@ func IsNodeDrainError(err error) bool {

type QueueKey struct {
types.NamespacedName
UID types.UID
UID types.UID
providerID string
}

func NewQueueKey(pod *corev1.Pod) QueueKey {
func NewQueueKey(pod *corev1.Pod, providerID string) QueueKey {
return QueueKey{
NamespacedName: client.ObjectKeyFromObject(pod),
UID: pod.UID,
providerID: providerID,
}
}

Expand Down Expand Up @@ -118,24 +122,24 @@ func (q *Queue) Register(_ context.Context, m manager.Manager) error {
}

// Add adds pods to the Queue
func (q *Queue) Add(pods ...*corev1.Pod) {
func (q *Queue) Add(node *corev1.Node, pods ...*corev1.Pod) {
q.mu.Lock()
defer q.mu.Unlock()

for _, pod := range pods {
qk := NewQueueKey(pod)
qk := NewQueueKey(pod, node.Spec.ProviderID)
if !q.set.Has(qk) {
q.set.Insert(qk)
q.TypedRateLimitingInterface.Add(qk)
}
}
}

func (q *Queue) Has(pod *corev1.Pod) bool {
func (q *Queue) Has(node *corev1.Node, pod *corev1.Pod) bool {
q.mu.Lock()
defer q.mu.Unlock()

return q.set.Has(NewQueueKey(pod))
return q.set.Has(NewQueueKey(pod, node.Spec.ProviderID))
}

func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) {
Expand Down Expand Up @@ -171,6 +175,11 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) {
// Evict returns true if successful eviction call, and false if there was an eviction-related error
func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Pod", klog.KRef(key.Namespace, key.Name)))
evictionMessage, err := evictionReason(ctx, key, q.kubeClient)
if err != nil {
cnmcavoy marked this conversation as resolved.
Show resolved Hide resolved
// XXX(cmcavoy): this should be unreachable, but we log it if it happens
log.FromContext(ctx).V(1).Error(err, "failed looking up pod eviction reason")
}
if err := q.kubeClient.SubResource("eviction").Create(ctx,
&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: key.Namespace, Name: key.Name}},
&policyv1.Eviction{
Expand Down Expand Up @@ -205,6 +214,18 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
return false
}
NodesEvictionRequestsTotal.Inc(map[string]string{CodeLabel: "200"})
q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}))
q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}, evictionMessage))
return true
}

func evictionReason(ctx context.Context, key QueueKey, kubeClient client.Client) (string, error) {
nodeClaim, err := node.NodeClaimForNode(ctx, kubeClient, &corev1.Node{Spec: corev1.NodeSpec{ProviderID: key.providerID}})
if err != nil {
return "", err
}
terminationCondition := nodeClaim.StatusConditions().Get(v1.ConditionTypeDisruptionReason)
if terminationCondition.IsTrue() {
return terminationCondition.Message, nil
}
return "Forceful Termination", nil
}
Loading
Loading