Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SHIPA-2788] Annotates events with Pod Name in the event of a Error type event #250

Merged
merged 2 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/api/v1beta1/app_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,8 @@ const (
DeploymentAnnotationInvolvedObjectName = "deployment.shipa.io/involved-object-name"
DeploymentAnnotationInvolvedObjectFieldPath = "deployment.shipa.io/involved-object-field-path"
DeploymentAnnotationSourceHost = "deployment.shipa.io/source-host"
DeploymentAnnotationSourceComponent = "deployment.shipa.io/cource-component"
DeploymentAnnotationSourceComponent = "deployment.shipa.io/source-component"
DeploymentAnnotationPodErrorName = "deployment.shipa.io/pod-error-name"

AppReconcileStarted = "AppReconcileStarted"
AppReconcileComplete = "AppReconcileComplete"
Expand All @@ -1036,6 +1037,7 @@ type AppDeploymentEvent struct {
// Source is the source of an incoming event
SourceHost string
SourceComponent string
PodErrorName string
}

func AppDeploymentEventFromAnnotations(annotations map[string]string) *AppDeploymentEvent {
Expand Down
50 changes: 27 additions & 23 deletions internal/controllers/app_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (r *AppReconciler) reconcile(ctx context.Context, app *ketchv1.App, logger
}

// retry until all pods for canary deployment comes to running state.
if err := checkPodStatus(r.Group, r.Client, app.Name, app.Spec.Deployments[1].Version); err != nil {
if _, err := checkPodStatus(r.Group, r.Client, app.Name, app.Spec.Deployments[1].Version); len(app.Spec.Deployments) > 1 && err != nil {

if !timeoutExpired(app.Spec.Canary.Started, r.Now()) {
return appReconcileResult{
Expand Down Expand Up @@ -476,7 +476,7 @@ func (r *AppReconciler) watchDeployEvents(ctx context.Context, app *ketchv1.App,
if strings.Contains(err.Error(), "unknown (get events)") {
err = errors.WithMessagef(err, "assure clusterrole 'manager-role' has 'watch' permissions on event resources")
}
watchErrorEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("error watching deployments for workload %s: %s", wl.Name, err.Error()), process.Name)
watchErrorEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("error watching deployments for workload %s: %s", wl.Name, err.Error()), process.Name, "")
recorder.AnnotatedEventf(app, watchErrorEvent.Annotations, v1.EventTypeWarning, watchErrorEvent.Reason, watchErrorEvent.Description)
return err
}
Expand Down Expand Up @@ -509,7 +509,7 @@ func (r *AppReconciler) watchDeployEvents(ctx context.Context, app *ketchv1.App,
// assign current cancelFunc and cancel the previous one
cleanup := r.CancelMap.replaceAndCancelPrevious(wl.Name, cancel)

reconcileStartedEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileStarted, fmt.Sprintf("Updating units [%s]", process.Name), process.Name)
reconcileStartedEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileStarted, fmt.Sprintf("Updating units [%s]", process.Name), process.Name, "")
recorder.AnnotatedEventf(app, reconcileStartedEvent.Annotations, v1.EventTypeNormal, reconcileStartedEvent.Reason, reconcileStartedEvent.Description)
go r.watchFunc(ctx, cleanup, app, process.Name, recorder, watcher, cli, wl, timeout)
return nil
Expand All @@ -533,34 +533,34 @@ func (r *AppReconciler) watchFunc(ctx context.Context, cleanup cleanupFunc, app
for i := range wl.Conditions {
c := wl.Conditions[i]
if c.Type == DeploymentProgressing && c.Reason == deadlineExeceededProgressCond {
deadlineExceededEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("deployment %q exceeded its progress deadline", wl.Name), processName)
deadlineExceededEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("deployment %q exceeded its progress deadline", wl.Name), processName, "")
recorder.AnnotatedEventf(app, deadlineExceededEvent.Annotations, v1.EventTypeWarning, deadlineExceededEvent.Reason, deadlineExceededEvent.Description)
return errors.Errorf("deployment %q exceeded its progress deadline", wl.Name)
}
}
if oldUpdatedReplicas != wl.UpdatedReplicas {
unitsCreatedEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("%d of %d new units created", wl.UpdatedReplicas, specReplicas), processName)
unitsCreatedEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("%d of %d new units created", wl.UpdatedReplicas, specReplicas), processName, "")
recorder.AnnotatedEventf(app, unitsCreatedEvent.Annotations, v1.EventTypeNormal, unitsCreatedEvent.Reason, unitsCreatedEvent.Description)
}

if healthcheckTimeout == nil && wl.UpdatedReplicas == specReplicas {
err := checkPodStatus(r.Group, r.Client, app.Name, app.Spec.Deployments[len(app.Spec.Deployments)-1].Version)
_, err := checkPodStatus(r.Group, r.Client, app.Name, app.Spec.Deployments[len(app.Spec.Deployments)-1].Version)
if err == nil {
healthcheckTimeout = time.After(maxWaitTimeDuration)
healthcheckEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("waiting healthcheck on %d created units", specReplicas), processName)
healthcheckEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("waiting healthcheck on %d created units", specReplicas), processName, "")
recorder.AnnotatedEventf(app, healthcheckEvent.Annotations, v1.EventTypeNormal, healthcheckEvent.Reason, healthcheckEvent.Description)
}
}

readyUnits := wl.ReadyReplicas
if oldReadyUnits != readyUnits && readyUnits >= 0 {
unitsReadyEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("%d of %d new units ready", readyUnits, specReplicas), processName)
unitsReadyEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("%d of %d new units ready", readyUnits, specReplicas), processName, "")
recorder.AnnotatedEventf(app, unitsReadyEvent.Annotations, v1.EventTypeNormal, unitsReadyEvent.Reason, unitsReadyEvent.Description)
}

pendingTermination := wl.Replicas - wl.UpdatedReplicas
if oldPendingTermination != pendingTermination && pendingTermination > 0 {
pendingTerminationEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("%d old units pending termination", pendingTermination), processName)
pendingTerminationEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("%d old units pending termination", pendingTermination), processName, "")
recorder.AnnotatedEventf(app, pendingTerminationEvent.Annotations, v1.EventTypeNormal, pendingTerminationEvent.Reason, pendingTerminationEvent.Description)
}

Expand All @@ -583,13 +583,15 @@ func (r *AppReconciler) watchFunc(ctx context.Context, cleanup cleanupFunc, app
recorder.AnnotatedEventf(app, appDeploymentEvent.Annotations, v1.EventTypeNormal, ketchv1.AppReconcileUpdate, appDeploymentEvent.Description)
}
case <-healthcheckTimeout:
podName, _ := checkPodStatus(r.Group, r.Client, app.Name, app.Spec.Deployments[len(app.Spec.Deployments)-1].Version)
err = createDeployTimeoutError(ctx, cli.k8sClient, app, time.Since(now), cli.workloadNamespace, app.GroupVersionKind().Group, "healthcheck")
healthcheckTimeoutEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("error waiting for healthcheck: %s", err.Error()), processName)
healthcheckTimeoutEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("error waiting for healthcheck: %s", err.Error()), processName, podName)
recorder.AnnotatedEventf(app, healthcheckTimeoutEvent.Annotations, v1.EventTypeWarning, healthcheckTimeoutEvent.Reason, healthcheckTimeoutEvent.Description)
return err
case <-timeout:
podName, _ := checkPodStatus(r.Group, r.Client, app.Name, app.Spec.Deployments[len(app.Spec.Deployments)-1].Version)
err = createDeployTimeoutError(ctx, cli.k8sClient, app, time.Since(now), cli.workloadNamespace, app.GroupVersionKind().Group, "full rollout")
timeoutEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("deployment timeout: %s", err.Error()), processName)
timeoutEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("deployment timeout: %s", err.Error()), processName, podName)
recorder.AnnotatedEventf(app, timeoutEvent.Annotations, v1.EventTypeWarning, timeoutEvent.Reason, timeoutEvent.Description)
return err
case <-ctx.Done():
Expand All @@ -604,7 +606,8 @@ func (r *AppReconciler) watchFunc(ctx context.Context, cleanup cleanupFunc, app
return err
}
if err != nil && !k8sErrors.IsNotFound(err) {
deploymentErrorEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("error getting deployments: %s", err.Error()), processName)
podName, _ := checkPodStatus(r.Group, r.Client, app.Name, app.Spec.Deployments[len(app.Spec.Deployments)-1].Version)
deploymentErrorEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("error getting deployments: %s", err.Error()), processName, podName)
recorder.AnnotatedEventf(app, deploymentErrorEvent.Annotations, v1.EventTypeWarning, deploymentErrorEvent.Reason, deploymentErrorEvent.Description)
return err
}
Expand All @@ -614,7 +617,7 @@ func (r *AppReconciler) watchFunc(ctx context.Context, cleanup cleanupFunc, app
}

outcome := ketchv1.AppReconcileOutcome{AppName: app.Name, DeploymentCount: wl.ReadyReplicas}
outcomeEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileComplete, outcome.String(), processName)
outcomeEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileComplete, outcome.String(), processName, "")
recorder.AnnotatedEventf(app, outcomeEvent.Annotations, v1.EventTypeNormal, outcomeEvent.Reason, outcomeEvent.Description)
return nil
}
Expand Down Expand Up @@ -650,7 +653,7 @@ func appDeploymentEventFromWatchEvent(watchEvent watch.Event, app *ketchv1.App,
}

// newAppDeploymentEvent creates a new AppDeploymentEvent, creating Annotations for use when emitting App Events.
func newAppDeploymentEvent(app *ketchv1.App, reason, desc, processName string) *ketchv1.AppDeploymentEvent {
func newAppDeploymentEvent(app *ketchv1.App, reason, desc, processName, podName string) *ketchv1.AppDeploymentEvent {
var version int
if len(app.Spec.Deployments) > 0 {
version = int(app.Spec.Deployments[len(app.Spec.Deployments)-1].Version)
Expand All @@ -667,6 +670,7 @@ func newAppDeploymentEvent(app *ketchv1.App, reason, desc, processName string) *
ketchv1.DeploymentAnnotationEventName: reason,
ketchv1.DeploymentAnnotationDescription: desc,
ketchv1.DeploymentAnnotationProcessName: processName,
ketchv1.DeploymentAnnotationPodErrorName: podName,
},
}
}
Expand All @@ -684,7 +688,6 @@ func createDeployTimeoutError(ctx context.Context, cli kubernetes.Interface, app
deploymentVersion = int(app.Spec.Deployments[len(app.Spec.Deployments)-1].Version)
}
opts := metav1.ListOptions{
FieldSelector: "involvedObject.kind=Pod",
LabelSelector: fmt.Sprintf("%s/app-name=%s,%s/app-deployment-version=%d", group, app.Name, group, deploymentVersion),
}
pods, err := cli.CoreV1().Pods(app.GetNamespace()).List(ctx, opts)
Expand Down Expand Up @@ -737,13 +740,14 @@ func timeoutExpired(t *metav1.Time, now time.Time) bool {
}

// checkPodStatus checks whether all pods for a deployment are running or not.
func checkPodStatus(group string, c client.Client, appName string, depVersion ketchv1.DeploymentVersion) error {
// If a Pod is found in a non-healthy state, it's name is returned
func checkPodStatus(group string, c client.Client, appName string, depVersion ketchv1.DeploymentVersion) (string, error) {
if c == nil {
return errors.New("client must be non-nil")
return "", errors.New("client must be non-nil")
}

if len(appName) == 0 || depVersion <= 0 {
return errors.New("invalid app specifications")
return "", errors.New("invalid app specifications")
}

// podList contains list of Pods matching the specified labels below
Expand All @@ -756,27 +760,27 @@ func checkPodStatus(group string, c client.Client, appName string, depVersion ke
}

if err := c.List(context.Background(), podList, listOpts...); err != nil {
return err
return "", err
}

// check if all pods are running for the deployment
for _, pod := range podList.Items {
// check if pod have voluntarily terminated with a container exit code of 0
if pod.Status.Phase == v1.PodSucceeded {
return nil
return "", nil
}

if pod.Status.Phase != v1.PodRunning {
return errors.New("all pods are not running")
return pod.Name, errors.New("all pods are not running")
}

for _, c := range pod.Status.Conditions {
if c.Status != v1.ConditionTrue {
return errors.New("all pods are not in healthy state")
return pod.Name, errors.New("all pods are not in healthy state")
}
}
}
return nil
return "", nil
}

func (r *AppReconciler) deleteChart(ctx context.Context, app *ketchv1.App) error {
Expand Down
24 changes: 15 additions & 9 deletions internal/controllers/app_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,12 +560,13 @@ func Test_checkPodStatus(t *testing.T) {
}
}
tests := []struct {
name string
pods []runtime.Object
appName string
depVersion ketchv1.DeploymentVersion
group string
wantErr string
name string
pods []runtime.Object
appName string
depVersion ketchv1.DeploymentVersion
group string
expectedPod string
wantErr string
}{
{
name: "pod in Pending state",
Expand All @@ -575,7 +576,8 @@ func Test_checkPodStatus(t *testing.T) {
pods: []runtime.Object{
createPod("theketch.io", "my-app", "5", v1.PodStatus{Phase: v1.PodPending}),
},
wantErr: `all pods are not running`,
expectedPod: "my-app-5",
wantErr: `all pods are not running`,
},
{
name: "pod in Pending state but group doesn't match",
Expand All @@ -591,12 +593,13 @@ func Test_checkPodStatus(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := ctrlFake.NewClientBuilder().WithScheme(clientgoscheme.Scheme).WithRuntimeObjects(tt.pods...).Build()
err := checkPodStatus(tt.group, cli, tt.appName, tt.depVersion)
podName, err := checkPodStatus(tt.group, cli, tt.appName, tt.depVersion)
if len(tt.wantErr) > 0 {
require.NotNil(t, err)
require.Equal(t, tt.wantErr, err.Error())
return
}
require.Equal(t, podName, tt.expectedPod)
require.Nil(t, err)
})
}
Expand Down Expand Up @@ -692,12 +695,14 @@ func TestAppDeloymentEvent(t *testing.T) {
reason string
desc string
processName string
podName string
expected *ketchv1.AppDeploymentEvent
}{
{
reason: "test reason",
desc: "test message",
processName: "test process",
podName: "test-pod",
expected: &ketchv1.AppDeploymentEvent{
Name: app.Name,
DeploymentVersion: 2,
Expand All @@ -710,13 +715,14 @@ func TestAppDeloymentEvent(t *testing.T) {
ketchv1.DeploymentAnnotationEventName: "test reason",
ketchv1.DeploymentAnnotationDescription: "test message",
ketchv1.DeploymentAnnotationProcessName: "test process",
ketchv1.DeploymentAnnotationPodErrorName: "test-pod",
},
},
},
}
for i, tc := range tests {
t.Run(fmt.Sprintf("test %d", i), func(t *testing.T) {
ev := newAppDeploymentEvent(app, tc.reason, tc.desc, tc.processName)
ev := newAppDeploymentEvent(app, tc.reason, tc.desc, tc.processName, tc.podName)
require.Equal(t, tc.expected, ev)
})
}
Expand Down