Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dennisme/rebase pod events #631

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/node-termination-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func cordonNode(node node.Node, nodeName string, drainEvent *monitor.Interruptio
}

func cordonAndDrainNode(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder, sqsTerminationDraining bool) error {
err := node.CordonAndDrain(nodeName, drainEvent.Description)
err := node.CordonAndDrain(nodeName, drainEvent.Description, recorder.EventRecorder)
if err != nil {
if errors.IsNotFound(err) {
log.Err(err).Msgf("node '%s' not found in the cluster", nodeName)
Expand Down
33 changes: 32 additions & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -64,6 +65,13 @@ const (
maxTaintValueLength = 63
)

const (
// PodEvictReason is the event reason emitted for Pod evictions during node drain
PodEvictReason = "PodEviction"
// PodEvictMsg is the event message emitted for Pod evictions during node drain
PodEvictMsg = "Pod evicted due to node drain"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this sufficient detail for the event? Do you need it to specify that node-termination-handler initiated the drain in response to a particular AWS event (Spot ITN, ASG event, etc.)?

)

var (
maxRetryDeadline time.Duration = 5 * time.Second
conflictRetryInterval time.Duration = 750 * time.Millisecond
Expand Down Expand Up @@ -95,7 +103,7 @@ func NewWithValues(nthConfig config.Config, drainHelper *drain.Helper, uptime up
}

// CordonAndDrain will cordon the node and evict pods based on the config
func (n Node) CordonAndDrain(nodeName string, reason string) error {
func (n Node) CordonAndDrain(nodeName string, reason string, recorder recorderInterface) error {
if n.nthConfig.DryRun {
log.Info().Str("node_name", nodeName).Str("reason", reason).Msg("Node would have been cordoned and drained, but dry-run flag was set.")
return nil
Expand All @@ -114,6 +122,25 @@ func (n Node) CordonAndDrain(nodeName string, reason string) error {
if err != nil {
return err
}
// Emit events for all pods that will be evicted
if recorder != nil {
pods, err := n.fetchAllPods(nodeName)
if err == nil {
for _, pod := range pods.Items {
podRef := &corev1.ObjectReference{
Kind: "Pod",
Name: pod.Name,
Namespace: pod.Namespace,
}
annotations := make(map[string]string)
annotations["node"] = nodeName
for k, v := range pod.GetLabels() {
annotations[k] = v
}
recorder.AnnotatedEventf(podRef, annotations, "Normal", PodEvictReason, PodEvictMsg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should "Normal" also be in a constant, like PodEvictReason and PodEvictMsg? What are the other possible values for that argument?

}
}
}
err = drain.RunNodeDrain(n.drainHelper, node.Name)
if err != nil {
return err
Expand Down Expand Up @@ -800,3 +827,7 @@ func filterPodForDeletion(podName string) func(pod corev1.Pod) drain.PodDeleteSt
return drain.MakePodDeleteStatusOkay()
}
}

type recorderInterface interface {
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: eventtype -> eventType

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good catch. I agree.

}
43 changes: 40 additions & 3 deletions pkg/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package node_test
import (
"context"
"strconv"
"strings"
"testing"
"time"

Expand All @@ -28,6 +29,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"k8s.io/kubectl/pkg/drain"
)

Expand Down Expand Up @@ -61,7 +63,8 @@ func TestDryRun(t *testing.T) {
tNode, err := node.New(config.Config{DryRun: true})
h.Ok(t, err)

err = tNode.CordonAndDrain(nodeName, "cordonReason")
fakeRecorder := record.NewFakeRecorder(100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Make this const please
  • Is it necessary for the buffer size to be this large?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not, but if it set to small the tests will hang indefinitely.

err = tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder)
h.Ok(t, err)

err = tNode.Cordon(nodeName, "cordonReason")
Expand Down Expand Up @@ -98,6 +101,7 @@ func TestNewFailure(t *testing.T) {
}

func TestDrainSuccess(t *testing.T) {
controllerBool := true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update to be more descriptive, ex: isOwnerController

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call

client := fake.NewSimpleClientset()
_, err := client.CoreV1().Nodes().Create(
context.Background(),
Expand All @@ -106,14 +110,47 @@ func TestDrainSuccess(t *testing.T) {
},
metav1.CreateOptions{})
h.Ok(t, err)

_, err = client.CoreV1().Pods("default").Create(
context.Background(),
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "cool-app-pod-",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Name: "cool-app",
Kind: "ReplicaSet",
Controller: &controllerBool,
},
},
},
Spec: v1.PodSpec{
NodeName: nodeName,
},
},
metav1.CreateOptions{})
h.Ok(t, err)

fakeRecorder := record.NewFakeRecorder(100)

tNode := getNode(t, getDrainHelper(client))
err = tNode.CordonAndDrain(nodeName, "cordonReason")
err = tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder)
h.Ok(t, err)
close(fakeRecorder.Events)
Copy link
Contributor

@brycahta brycahta May 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to explicitly close this channel? If so, please update and use defer in case h.Ok fails.

expectedEventArrived := false
for event := range fakeRecorder.Events {
if strings.Contains(event, "Normal PodEviction Pod evicted due to node drain") {
expectedEventArrived = true
}
}
h.Assert(t, expectedEventArrived, "PodEvicted event was not emitted")
}

func TestDrainCordonNodeFailure(t *testing.T) {
fakeRecorder := record.NewFakeRecorder(100)
tNode := getNode(t, getDrainHelper(fake.NewSimpleClientset()))
err := tNode.CordonAndDrain(nodeName, "cordonReason")
err := tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder)
h.Assert(t, true, "Failed to return error on CordonAndDrain failing to cordon node", err != nil)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/observability/k8s-events.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func InitK8sEventRecorder(enabled bool, nodeName string, sqsMode bool, nodeMetad
}

broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("default")})
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide some context on this change? What is the difference in behavior between "default" and ""?


return K8sEventRecorder{
annotations: annotations,
Expand Down