Skip to content

Commit

Permalink
feat: Annotate pod events with workflow name and UID (#6455)
Browse files Browse the repository at this point in the history
Signed-off-by: Joseph McGovern <joseph.mcgovern@workiva.com>
  • Loading branch information
josephmcgovern-wf authored Aug 6, 2021
1 parent c6b6f67 commit 3373dc5
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 4 deletions.
14 changes: 14 additions & 0 deletions test/e2e/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,20 @@ func (w *When) CreateConfigMap(name string, data map[string]string) *When {
return w
}

func (w *When) UpdateConfigMap(name string, data map[string]string) *When {
w.t.Helper()

ctx := context.Background()
_, err := w.kubeClient.CoreV1().ConfigMaps(Namespace).Update(ctx, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: name, Labels: map[string]string{Label: "true"}},
Data: data,
}, metav1.UpdateOptions{})
if err != nil {
w.t.Fatal(err)
}
return w
}

func (w *When) DeleteConfigMap(name string) *When {
w.t.Helper()
ctx := context.Background()
Expand Down
77 changes: 77 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
package e2e

import (
"context"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
apiv1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -186,6 +189,80 @@ spec:
})
}

func (s *FunctionalSuite) TestEventOnNodeFailSentAsPod() {
// Test whether an WorkflowFailed event (with appropriate message) is emitted in case of node failure
var uid types.UID
var nodeId types.UID
var nodeName string
// Update controller config map to set nodeEvents.sendAsPod to true
ctx := context.Background()
configMap, _ := s.KubeClient.CoreV1().ConfigMaps("argo").Get(
ctx,
"workflow-controller-configmap",
metav1.GetOptions{},
)
originalData := make(map[string]string)
for key, value := range configMap.Data {
originalData[key] = value
}
configMap.Data["nodeEvents"] = "\n sendAsPod: true"
s.Given().
Workflow("@expectedfailures/failed-step-event.yaml").
When().
UpdateConfigMap(
"workflow-controller-configmap",
configMap.Data).
// Give controller enough time to update from config map change
Wait(5*time.Second).
SubmitWorkflow().
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
uid = metadata.UID
}).
ExpectWorkflowNode(func(status wfv1.NodeStatus) bool {
return strings.HasPrefix(status.Name, "failed-step-event-")
}, func(t *testing.T, status *wfv1.NodeStatus, pod *apiv1.Pod) {
nodeId = pod.UID
nodeName = status.Name
}).
ExpectAuditEvents(
func(event apiv1.Event) bool {
return (event.InvolvedObject.Kind == workflow.WorkflowKind && event.InvolvedObject.UID == uid) || (event.InvolvedObject.Kind == "Pod" && event.InvolvedObject.UID == nodeId && strings.HasPrefix(event.Reason, "Workflow"))
},
4,
func(t *testing.T, es []corev1.Event) {
for _, e := range es {
switch e.Reason {
case "WorkflowNodeRunning":
assert.Equal(t, e.InvolvedObject.Kind, "Pod")
assert.Contains(t, e.Message, "Running node failed-step-event-")
assert.Equal(t, e.Annotations["workflows.argoproj.io/node-name"], nodeName)
assert.Equal(t, e.Annotations["workflows.argoproj.io/workflow-uid"], string(uid))
assert.Contains(t, e.Annotations["workflows.argoproj.io/workflow-name"], "failed-step-event-")
case "WorkflowRunning":
case "WorkflowNodeFailed":
assert.Equal(t, e.InvolvedObject.Kind, "Pod")
assert.Contains(t, e.Message, "Failed node failed-step-event-")
assert.Equal(t, e.Annotations["workflows.argoproj.io/node-type"], "Pod")
assert.Equal(t, e.Annotations["workflows.argoproj.io/node-name"], nodeName)
assert.Contains(t, e.Annotations["workflows.argoproj.io/workflow-name"], "failed-step-event-")
assert.Equal(t, e.Annotations["workflows.argoproj.io/workflow-uid"], string(uid))
case "WorkflowFailed":
assert.Contains(t, e.Message, "exit code 1")
default:
assert.Fail(t, e.Reason)
}
}
},
).
When().
// Reset config map to original settings
UpdateConfigMap("workflow-controller-configmap", originalData).
// Give controller enough time to update from config map change
Wait(5 * time.Second)
}

func (s *FunctionalSuite) TestEventOnNodeFail() {
// Test whether an WorkflowFailed event (with appropriate message) is emitted in case of node failure
var uid types.UID
Expand Down
5 changes: 5 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ const (
// was scheduled to run by CronWorkflow.
AnnotationKeyCronWfScheduledTime = workflow.WorkflowFullName + "/scheduled-time"

// AnnotationKeyWorkflowName is the name of the workflow
AnnotationKeyWorkflowName = workflow.WorkflowFullName + "/workflow-name"
// AnnotationKeyWorkflowUID is the uid of the workflow
AnnotationKeyWorkflowUID = workflow.WorkflowFullName + "/workflow-uid"

// LabelKeyControllerInstanceID is the label the controller will carry forward to workflows/pod labels
// for the purposes of workflow segregation
LabelKeyControllerInstanceID = workflow.WorkflowFullName + "/controller-instanceid"
Expand Down
11 changes: 7 additions & 4 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2183,6 +2183,10 @@ func (woc *wfOperationCtx) recordNodePhaseEvent(node *wfv1.NodeStatus) {
eventType = apiv1.EventTypeNormal
}
eventConfig := woc.controller.Config.NodeEvents
annotations := map[string]string{
common.AnnotationKeyNodeType: string(node.Type),
common.AnnotationKeyNodeName: node.Name,
}
var involvedObject runtime.Object = woc.wf
if eventConfig.SendAsPod {
pod, err := woc.getPodByNode(node)
Expand All @@ -2191,14 +2195,13 @@ func (woc *wfOperationCtx) recordNodePhaseEvent(node *wfv1.NodeStatus) {
}
if pod != nil {
involvedObject = pod
annotations[common.AnnotationKeyWorkflowName] = woc.wf.Name
annotations[common.AnnotationKeyWorkflowUID] = string(woc.wf.GetUID())
}
}
woc.eventRecorder.AnnotatedEventf(
involvedObject,
map[string]string{
common.AnnotationKeyNodeType: string(node.Type),
common.AnnotationKeyNodeName: node.Name,
},
annotations,
eventType,
fmt.Sprintf("WorkflowNode%s", node.Phase),
message,
Expand Down

0 comments on commit 3373dc5

Please sign in to comment.