Skip to content

Commit

Permalink
Merge pull request #250 from theketchio/shipa-2788
Browse files Browse the repository at this point in the history
[SHIPA-2788] Annotates events with Pod Name in the event of a Error type event
  • Loading branch information
kavinaravind authored May 20, 2022
2 parents a16fc36 + 5d5e7bb commit decbc22
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 33 deletions.
4 changes: 3 additions & 1 deletion internal/api/v1beta1/app_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,8 @@ const (
DeploymentAnnotationInvolvedObjectName = "deployment.shipa.io/involved-object-name"
DeploymentAnnotationInvolvedObjectFieldPath = "deployment.shipa.io/involved-object-field-path"
DeploymentAnnotationSourceHost = "deployment.shipa.io/source-host"
DeploymentAnnotationSourceComponent = "deployment.shipa.io/cource-component"
DeploymentAnnotationSourceComponent = "deployment.shipa.io/source-component"
DeploymentAnnotationPodErrorName = "deployment.shipa.io/pod-error-name"

AppReconcileStarted = "AppReconcileStarted"
AppReconcileComplete = "AppReconcileComplete"
Expand All @@ -1036,6 +1037,7 @@ type AppDeploymentEvent struct {
// Source is the source of an incoming event
SourceHost string
SourceComponent string
PodErrorName string
}

func AppDeploymentEventFromAnnotations(annotations map[string]string) *AppDeploymentEvent {
Expand Down
50 changes: 27 additions & 23 deletions internal/controllers/app_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (r *AppReconciler) reconcile(ctx context.Context, app *ketchv1.App, logger
}

// retry until all pods for canary deployment comes to running state.
if err := checkPodStatus(r.Group, r.Client, app.Name, app.Spec.Deployments[1].Version); err != nil {
if _, err := checkPodStatus(r.Group, r.Client, app.Name, app.Spec.Deployments[1].Version); len(app.Spec.Deployments) > 1 && err != nil {

if !timeoutExpired(app.Spec.Canary.Started, r.Now()) {
return appReconcileResult{
Expand Down Expand Up @@ -476,7 +476,7 @@ func (r *AppReconciler) watchDeployEvents(ctx context.Context, app *ketchv1.App,
if strings.Contains(err.Error(), "unknown (get events)") {
err = errors.WithMessagef(err, "assure clusterrole 'manager-role' has 'watch' permissions on event resources")
}
watchErrorEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("error watching deployments for workload %s: %s", wl.Name, err.Error()), process.Name)
watchErrorEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("error watching deployments for workload %s: %s", wl.Name, err.Error()), process.Name, "")
recorder.AnnotatedEventf(app, watchErrorEvent.Annotations, v1.EventTypeWarning, watchErrorEvent.Reason, watchErrorEvent.Description)
return err
}
Expand Down Expand Up @@ -509,7 +509,7 @@ func (r *AppReconciler) watchDeployEvents(ctx context.Context, app *ketchv1.App,
// assign current cancelFunc and cancel the previous one
cleanup := r.CancelMap.replaceAndCancelPrevious(wl.Name, cancel)

reconcileStartedEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileStarted, fmt.Sprintf("Updating units [%s]", process.Name), process.Name)
reconcileStartedEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileStarted, fmt.Sprintf("Updating units [%s]", process.Name), process.Name, "")
recorder.AnnotatedEventf(app, reconcileStartedEvent.Annotations, v1.EventTypeNormal, reconcileStartedEvent.Reason, reconcileStartedEvent.Description)
go r.watchFunc(ctx, cleanup, app, process.Name, recorder, watcher, cli, wl, timeout)
return nil
Expand All @@ -533,34 +533,34 @@ func (r *AppReconciler) watchFunc(ctx context.Context, cleanup cleanupFunc, app
for i := range wl.Conditions {
c := wl.Conditions[i]
if c.Type == DeploymentProgressing && c.Reason == deadlineExeceededProgressCond {
deadlineExceededEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("deployment %q exceeded its progress deadline", wl.Name), processName)
deadlineExceededEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("deployment %q exceeded its progress deadline", wl.Name), processName, "")
recorder.AnnotatedEventf(app, deadlineExceededEvent.Annotations, v1.EventTypeWarning, deadlineExceededEvent.Reason, deadlineExceededEvent.Description)
return errors.Errorf("deployment %q exceeded its progress deadline", wl.Name)
}
}
if oldUpdatedReplicas != wl.UpdatedReplicas {
unitsCreatedEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("%d of %d new units created", wl.UpdatedReplicas, specReplicas), processName)
unitsCreatedEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("%d of %d new units created", wl.UpdatedReplicas, specReplicas), processName, "")
recorder.AnnotatedEventf(app, unitsCreatedEvent.Annotations, v1.EventTypeNormal, unitsCreatedEvent.Reason, unitsCreatedEvent.Description)
}

if healthcheckTimeout == nil && wl.UpdatedReplicas == specReplicas {
err := checkPodStatus(r.Group, r.Client, app.Name, app.Spec.Deployments[len(app.Spec.Deployments)-1].Version)
_, err := checkPodStatus(r.Group, r.Client, app.Name, app.Spec.Deployments[len(app.Spec.Deployments)-1].Version)
if err == nil {
healthcheckTimeout = time.After(maxWaitTimeDuration)
healthcheckEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("waiting healthcheck on %d created units", specReplicas), processName)
healthcheckEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("waiting healthcheck on %d created units", specReplicas), processName, "")
recorder.AnnotatedEventf(app, healthcheckEvent.Annotations, v1.EventTypeNormal, healthcheckEvent.Reason, healthcheckEvent.Description)
}
}

readyUnits := wl.ReadyReplicas
if oldReadyUnits != readyUnits && readyUnits >= 0 {
unitsReadyEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("%d of %d new units ready", readyUnits, specReplicas), processName)
unitsReadyEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("%d of %d new units ready", readyUnits, specReplicas), processName, "")
recorder.AnnotatedEventf(app, unitsReadyEvent.Annotations, v1.EventTypeNormal, unitsReadyEvent.Reason, unitsReadyEvent.Description)
}

pendingTermination := wl.Replicas - wl.UpdatedReplicas
if oldPendingTermination != pendingTermination && pendingTermination > 0 {
pendingTerminationEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("%d old units pending termination", pendingTermination), processName)
pendingTerminationEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("%d old units pending termination", pendingTermination), processName, "")
recorder.AnnotatedEventf(app, pendingTerminationEvent.Annotations, v1.EventTypeNormal, pendingTerminationEvent.Reason, pendingTerminationEvent.Description)
}

Expand All @@ -583,13 +583,15 @@ func (r *AppReconciler) watchFunc(ctx context.Context, cleanup cleanupFunc, app
recorder.AnnotatedEventf(app, appDeploymentEvent.Annotations, v1.EventTypeNormal, ketchv1.AppReconcileUpdate, appDeploymentEvent.Description)
}
case <-healthcheckTimeout:
podName, _ := checkPodStatus(r.Group, r.Client, app.Name, app.Spec.Deployments[len(app.Spec.Deployments)-1].Version)
err = createDeployTimeoutError(ctx, cli.k8sClient, app, time.Since(now), cli.workloadNamespace, app.GroupVersionKind().Group, "healthcheck")
healthcheckTimeoutEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("error waiting for healthcheck: %s", err.Error()), processName)
healthcheckTimeoutEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("error waiting for healthcheck: %s", err.Error()), processName, podName)
recorder.AnnotatedEventf(app, healthcheckTimeoutEvent.Annotations, v1.EventTypeWarning, healthcheckTimeoutEvent.Reason, healthcheckTimeoutEvent.Description)
return err
case <-timeout:
podName, _ := checkPodStatus(r.Group, r.Client, app.Name, app.Spec.Deployments[len(app.Spec.Deployments)-1].Version)
err = createDeployTimeoutError(ctx, cli.k8sClient, app, time.Since(now), cli.workloadNamespace, app.GroupVersionKind().Group, "full rollout")
timeoutEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("deployment timeout: %s", err.Error()), processName)
timeoutEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("deployment timeout: %s", err.Error()), processName, podName)
recorder.AnnotatedEventf(app, timeoutEvent.Annotations, v1.EventTypeWarning, timeoutEvent.Reason, timeoutEvent.Description)
return err
case <-ctx.Done():
Expand All @@ -604,7 +606,8 @@ func (r *AppReconciler) watchFunc(ctx context.Context, cleanup cleanupFunc, app
return err
}
if err != nil && !k8sErrors.IsNotFound(err) {
deploymentErrorEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("error getting deployments: %s", err.Error()), processName)
podName, _ := checkPodStatus(r.Group, r.Client, app.Name, app.Spec.Deployments[len(app.Spec.Deployments)-1].Version)
deploymentErrorEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("error getting deployments: %s", err.Error()), processName, podName)
recorder.AnnotatedEventf(app, deploymentErrorEvent.Annotations, v1.EventTypeWarning, deploymentErrorEvent.Reason, deploymentErrorEvent.Description)
return err
}
Expand All @@ -614,7 +617,7 @@ func (r *AppReconciler) watchFunc(ctx context.Context, cleanup cleanupFunc, app
}

outcome := ketchv1.AppReconcileOutcome{AppName: app.Name, DeploymentCount: wl.ReadyReplicas}
outcomeEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileComplete, outcome.String(), processName)
outcomeEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileComplete, outcome.String(), processName, "")
recorder.AnnotatedEventf(app, outcomeEvent.Annotations, v1.EventTypeNormal, outcomeEvent.Reason, outcomeEvent.Description)
return nil
}
Expand Down Expand Up @@ -650,7 +653,7 @@ func appDeploymentEventFromWatchEvent(watchEvent watch.Event, app *ketchv1.App,
}

// newAppDeploymentEvent creates a new AppDeploymentEvent, creating Annotations for use when emitting App Events.
func newAppDeploymentEvent(app *ketchv1.App, reason, desc, processName string) *ketchv1.AppDeploymentEvent {
func newAppDeploymentEvent(app *ketchv1.App, reason, desc, processName, podName string) *ketchv1.AppDeploymentEvent {
var version int
if len(app.Spec.Deployments) > 0 {
version = int(app.Spec.Deployments[len(app.Spec.Deployments)-1].Version)
Expand All @@ -667,6 +670,7 @@ func newAppDeploymentEvent(app *ketchv1.App, reason, desc, processName string) *
ketchv1.DeploymentAnnotationEventName: reason,
ketchv1.DeploymentAnnotationDescription: desc,
ketchv1.DeploymentAnnotationProcessName: processName,
ketchv1.DeploymentAnnotationPodErrorName: podName,
},
}
}
Expand All @@ -684,7 +688,6 @@ func createDeployTimeoutError(ctx context.Context, cli kubernetes.Interface, app
deploymentVersion = int(app.Spec.Deployments[len(app.Spec.Deployments)-1].Version)
}
opts := metav1.ListOptions{
FieldSelector: "involvedObject.kind=Pod",
LabelSelector: fmt.Sprintf("%s/app-name=%s,%s/app-deployment-version=%d", group, app.Name, group, deploymentVersion),
}
pods, err := cli.CoreV1().Pods(app.GetNamespace()).List(ctx, opts)
Expand Down Expand Up @@ -737,13 +740,14 @@ func timeoutExpired(t *metav1.Time, now time.Time) bool {
}

// checkPodStatus checks whether all pods for a deployment are running or not.
func checkPodStatus(group string, c client.Client, appName string, depVersion ketchv1.DeploymentVersion) error {
// If a Pod is found in a non-healthy state, it's name is returned
func checkPodStatus(group string, c client.Client, appName string, depVersion ketchv1.DeploymentVersion) (string, error) {
if c == nil {
return errors.New("client must be non-nil")
return "", errors.New("client must be non-nil")
}

if len(appName) == 0 || depVersion <= 0 {
return errors.New("invalid app specifications")
return "", errors.New("invalid app specifications")
}

// podList contains list of Pods matching the specified labels below
Expand All @@ -756,27 +760,27 @@ func checkPodStatus(group string, c client.Client, appName string, depVersion ke
}

if err := c.List(context.Background(), podList, listOpts...); err != nil {
return err
return "", err
}

// check if all pods are running for the deployment
for _, pod := range podList.Items {
// check if pod have voluntarily terminated with a container exit code of 0
if pod.Status.Phase == v1.PodSucceeded {
return nil
return "", nil
}

if pod.Status.Phase != v1.PodRunning {
return errors.New("all pods are not running")
return pod.Name, errors.New("all pods are not running")
}

for _, c := range pod.Status.Conditions {
if c.Status != v1.ConditionTrue {
return errors.New("all pods are not in healthy state")
return pod.Name, errors.New("all pods are not in healthy state")
}
}
}
return nil
return "", nil
}

func (r *AppReconciler) deleteChart(ctx context.Context, app *ketchv1.App) error {
Expand Down
24 changes: 15 additions & 9 deletions internal/controllers/app_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,12 +560,13 @@ func Test_checkPodStatus(t *testing.T) {
}
}
tests := []struct {
name string
pods []runtime.Object
appName string
depVersion ketchv1.DeploymentVersion
group string
wantErr string
name string
pods []runtime.Object
appName string
depVersion ketchv1.DeploymentVersion
group string
expectedPod string
wantErr string
}{
{
name: "pod in Pending state",
Expand All @@ -575,7 +576,8 @@ func Test_checkPodStatus(t *testing.T) {
pods: []runtime.Object{
createPod("theketch.io", "my-app", "5", v1.PodStatus{Phase: v1.PodPending}),
},
wantErr: `all pods are not running`,
expectedPod: "my-app-5",
wantErr: `all pods are not running`,
},
{
name: "pod in Pending state but group doesn't match",
Expand All @@ -591,12 +593,13 @@ func Test_checkPodStatus(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := ctrlFake.NewClientBuilder().WithScheme(clientgoscheme.Scheme).WithRuntimeObjects(tt.pods...).Build()
err := checkPodStatus(tt.group, cli, tt.appName, tt.depVersion)
podName, err := checkPodStatus(tt.group, cli, tt.appName, tt.depVersion)
if len(tt.wantErr) > 0 {
require.NotNil(t, err)
require.Equal(t, tt.wantErr, err.Error())
return
}
require.Equal(t, podName, tt.expectedPod)
require.Nil(t, err)
})
}
Expand Down Expand Up @@ -692,12 +695,14 @@ func TestAppDeloymentEvent(t *testing.T) {
reason string
desc string
processName string
podName string
expected *ketchv1.AppDeploymentEvent
}{
{
reason: "test reason",
desc: "test message",
processName: "test process",
podName: "test-pod",
expected: &ketchv1.AppDeploymentEvent{
Name: app.Name,
DeploymentVersion: 2,
Expand All @@ -710,13 +715,14 @@ func TestAppDeloymentEvent(t *testing.T) {
ketchv1.DeploymentAnnotationEventName: "test reason",
ketchv1.DeploymentAnnotationDescription: "test message",
ketchv1.DeploymentAnnotationProcessName: "test process",
ketchv1.DeploymentAnnotationPodErrorName: "test-pod",
},
},
},
}
for i, tc := range tests {
t.Run(fmt.Sprintf("test %d", i), func(t *testing.T) {
ev := newAppDeploymentEvent(app, tc.reason, tc.desc, tc.processName)
ev := newAppDeploymentEvent(app, tc.reason, tc.desc, tc.processName, tc.podName)
require.Equal(t, tc.expected, ev)
})
}
Expand Down

0 comments on commit decbc22

Please sign in to comment.