diff --git a/internal/controller/appwrapper/appwrapper_controller.go b/internal/controller/appwrapper/appwrapper_controller.go index b0ea97e..6dbd24a 100644 --- a/internal/controller/appwrapper/appwrapper_controller.go +++ b/internal/controller/appwrapper/appwrapper_controller.go @@ -107,13 +107,14 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) if !aw.DeletionTimestamp.IsZero() { if controllerutil.ContainsFinalizer(aw, AppWrapperFinalizer) { statusUpdated := false + orig := copyForStatusPatch(aw) if meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.ResourcesDeployed)) { if !r.deleteComponents(ctx, aw) { // one or more components are still terminating if aw.Status.Phase != workloadv1beta2.AppWrapperTerminating { // Set Phase for better UX, but ignore errors. We still want to requeue after 5 seconds (not immediately) aw.Status.Phase = workloadv1beta2.AppWrapperTerminating - _ = r.Status().Update(ctx, aw) + _ = r.Status().Patch(ctx, aw, client.MergeFrom(orig)) } return ctrl.Result{RequeueAfter: 5 * time.Second}, nil // check after a short while } @@ -136,7 +137,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) statusUpdated = true } if statusUpdated { - if err := r.Status().Update(ctx, aw); err != nil { + if err := r.Status().Patch(ctx, aw, client.MergeFrom(orig)); err != nil { return ctrl.Result{}, err } } @@ -160,11 +161,12 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) } } + orig := copyForStatusPatch(aw) if err := utils.EnsureComponentStatusInitialized(aw); err != nil { return ctrl.Result{}, err } - return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperSuspended) + return ctrl.Result{}, r.transitionToPhase(ctx, orig, aw, workloadv1beta2.AppWrapperSuspended) case workloadv1beta2.AppWrapperSuspended: // no components deployed if aw.Spec.Suspend { @@ -172,6 +174,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) } // begin deployment + orig := copyForStatusPatch(aw) meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{ Type: string(workloadv1beta2.QuotaReserved), Status: metav1.ConditionTrue, @@ -196,13 +199,14 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) Reason: string(workloadv1beta2.AppWrapperResuming), Message: "Suspend is false", }) - return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperResuming) + return ctrl.Result{}, r.transitionToPhase(ctx, orig, aw, workloadv1beta2.AppWrapperResuming) case workloadv1beta2.AppWrapperResuming: // deploying components if aw.Spec.Suspend { - return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperSuspending) // abort deployment + return ctrl.Result{}, r.transitionToPhase(ctx, copyForStatusPatch(aw), aw, workloadv1beta2.AppWrapperSuspending) // abort deployment } - err, fatal := r.createComponents(ctx, aw) + err, fatal := r.createComponents(ctx, aw) // NOTE: createComponents applies patches to aw.Status incrementally as resources are created + orig := copyForStatusPatch(aw) if err != nil { if !fatal { startTime := meta.FindStatusCondition(aw.Status.Conditions, string(workloadv1beta2.ResourcesDeployed)).LastTransitionTime @@ -221,16 +225,17 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) }) r.Recorder.Event(aw, v1.EventTypeNormal, string(workloadv1beta2.Unhealthy), "CreateFailed: "+detailMsg) if fatal { - return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperFailed) // always move to failed on fatal error + return ctrl.Result{}, r.transitionToPhase(ctx, orig, aw, workloadv1beta2.AppWrapperFailed) // always move to failed on fatal error } else { - return r.resetOrFail(ctx, aw, false, 1) + return ctrl.Result{}, r.resetOrFail(ctx, orig, aw, false, 1) } } - return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperRunning) + return ctrl.Result{}, r.transitionToPhase(ctx, orig, aw, workloadv1beta2.AppWrapperRunning) case workloadv1beta2.AppWrapperRunning: // components deployed + orig := copyForStatusPatch(aw) if aw.Spec.Suspend { - return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperSuspending) // begin undeployment + return ctrl.Result{}, r.transitionToPhase(ctx, orig, aw, workloadv1beta2.AppWrapperSuspending) // begin undeployment } // Gather status information at the Component and Pod level. @@ -253,11 +258,11 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) Message: detailMsg, }) r.Recorder.Event(aw, v1.EventTypeNormal, string(workloadv1beta2.Unhealthy), "MissingComponent: "+detailMsg) - return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperFailed) + return ctrl.Result{}, r.transitionToPhase(ctx, orig, aw, workloadv1beta2.AppWrapperFailed) } // If a component's controller has put it into a failed state, we do not need - // to allow any further grace period. The situation will not self-correct. + // to allow a grace period. The situation will not self-correct. detailMsg = fmt.Sprintf("Found %v failed components", compStatus.failed) if compStatus.failed > 0 { meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{ @@ -267,7 +272,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) Message: detailMsg, }) r.Recorder.Event(aw, v1.EventTypeNormal, string(workloadv1beta2.Unhealthy), "FailedComponent: "+detailMsg) - return r.resetOrFail(ctx, aw, podStatus.terminalFailure, 1) + return ctrl.Result{}, r.resetOrFail(ctx, orig, aw, podStatus.terminalFailure, 1) } // Handle Success @@ -285,7 +290,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) Reason: string(workloadv1beta2.AppWrapperSucceeded), Message: msg, }) - return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperSucceeded) + return ctrl.Result{}, r.transitionToPhase(ctx, orig, aw, workloadv1beta2.AppWrapperSucceeded) } // Handle Failed Pods @@ -303,10 +308,10 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) now := time.Now() deadline := whenDetected.Add(gracePeriod) if now.Before(deadline) { - return requeueAfter(deadline.Sub(now), r.Status().Update(ctx, aw)) + return requeueAfter(deadline.Sub(now), r.Status().Patch(ctx, aw, client.MergeFrom(orig))) } else { r.Recorder.Eventf(aw, v1.EventTypeNormal, string(workloadv1beta2.Unhealthy), "FoundFailedPods: %v failed pods", podStatus.failed) - return r.resetOrFail(ctx, aw, podStatus.terminalFailure, 1) + return ctrl.Result{}, r.resetOrFail(ctx, orig, aw, podStatus.terminalFailure, 1) } } @@ -320,7 +325,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) Message: detailMsg, }) r.Recorder.Event(aw, v1.EventTypeNormal, string(workloadv1beta2.Unhealthy), detailMsg) - return r.resetOrFail(ctx, aw, false, 0) // Autopilot triggered evacuation does not increment retry count + return ctrl.Result{}, r.resetOrFail(ctx, orig, aw, false, 0) // Autopilot triggered evacuation does not increment retry count } clearCondition(aw, workloadv1beta2.Unhealthy, "FoundNoFailedPods", "") @@ -332,7 +337,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) Reason: "SufficientPodsReady", Message: fmt.Sprintf("%v pods running; %v pods succeeded", podStatus.running, podStatus.succeeded), }) - return requeueAfter(time.Minute, r.Status().Update(ctx, aw)) + return requeueAfter(time.Minute, r.Status().Patch(ctx, aw, client.MergeFrom(orig))) } // Not ready yet; either continue to wait or giveup if the warmup period has expired @@ -346,7 +351,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) graceDuration = r.admissionGraceDuration(ctx, aw) } if time.Now().Before(whenDeployed.Add(graceDuration)) { - return requeueAfter(5*time.Second, r.Status().Update(ctx, aw)) + return requeueAfter(5*time.Second, r.Status().Patch(ctx, aw, client.MergeFrom(orig))) } else { meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{ Type: string(workloadv1beta2.Unhealthy), @@ -355,14 +360,15 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) Message: podDetailsMessage, }) r.Recorder.Event(aw, v1.EventTypeNormal, string(workloadv1beta2.Unhealthy), "InsufficientPodsReady: "+podDetailsMessage) - return r.resetOrFail(ctx, aw, podStatus.terminalFailure, 1) + return ctrl.Result{}, r.resetOrFail(ctx, orig, aw, podStatus.terminalFailure, 1) } case workloadv1beta2.AppWrapperSuspending: // undeploying components + orig := copyForStatusPatch(aw) // finish undeploying components irrespective of desired state (suspend bit) if meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.ResourcesDeployed)) { if !r.deleteComponents(ctx, aw) { - return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + return requeueAfter(5*time.Second, r.Status().Patch(ctx, aw, client.MergeFrom(orig))) } meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{ Type: string(workloadv1beta2.ResourcesDeployed), @@ -379,17 +385,18 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) }) clearCondition(aw, workloadv1beta2.PodsReady, string(workloadv1beta2.AppWrapperSuspended), "") clearCondition(aw, workloadv1beta2.Unhealthy, string(workloadv1beta2.AppWrapperSuspended), "") - return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperSuspended) + return ctrl.Result{}, r.transitionToPhase(ctx, orig, aw, workloadv1beta2.AppWrapperSuspended) case workloadv1beta2.AppWrapperResetting: + orig := copyForStatusPatch(aw) if aw.Spec.Suspend { - return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperSuspending) // Suspending trumps Resetting + return ctrl.Result{}, r.transitionToPhase(ctx, orig, aw, workloadv1beta2.AppWrapperSuspending) // Suspending trumps Resetting } clearCondition(aw, workloadv1beta2.PodsReady, string(workloadv1beta2.AppWrapperResetting), "") if meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.ResourcesDeployed)) { if !r.deleteComponents(ctx, aw) { - return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + return requeueAfter(5*time.Second, r.Status().Patch(ctx, aw, client.MergeFrom(orig))) } meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{ Type: string(workloadv1beta2.ResourcesDeployed), @@ -405,7 +412,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) now := time.Now() deadline := whenReset.Add(pauseDuration) if now.Before(deadline) { - return requeueAfter(deadline.Sub(now), r.Status().Update(ctx, aw)) + return requeueAfter(deadline.Sub(now), r.Status().Patch(ctx, aw, client.MergeFrom(orig))) } meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{ @@ -414,7 +421,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) Reason: string(workloadv1beta2.AppWrapperResuming), Message: "Reset complete; resuming", }) - return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperResuming) + return ctrl.Result{}, r.transitionToPhase(ctx, orig, aw, workloadv1beta2.AppWrapperResuming) case workloadv1beta2.AppWrapperFailed: // Support for debugging failed jobs. @@ -423,6 +430,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) // a failed appwrapper unless Kueue preempts it by setting Suspend to true. deletionDelay := r.deletionOnFailureGraceDuration(ctx, aw) + orig := copyForStatusPatch(aw) if deletionDelay > 0 && !aw.Spec.Suspend { meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{ Type: string(workloadv1beta2.DeletingResources), @@ -435,13 +443,13 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) now := time.Now() deadline := whenDelayed.Add(deletionDelay) if now.Before(deadline) { - return requeueAfter(deadline.Sub(now), r.Status().Update(ctx, aw)) + return requeueAfter(deadline.Sub(now), r.Status().Patch(ctx, aw, client.MergeFrom(orig))) } } if meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.ResourcesDeployed)) { if !r.deleteComponents(ctx, aw) { - return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + return requeueAfter(5*time.Second, r.Status().Patch(ctx, aw, client.MergeFrom(orig))) } msg := "Resources deleted for failed AppWrapper" if deletionDelay > 0 && aw.Spec.Suspend { @@ -460,7 +468,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) Reason: string(workloadv1beta2.AppWrapperFailed), Message: "No resources deployed", }) - return ctrl.Result{}, r.Status().Update(ctx, aw) + return ctrl.Result{}, r.Status().Patch(ctx, aw, client.MergeFrom(orig)) case workloadv1beta2.AppWrapperSucceeded: if meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.ResourcesDeployed)) { @@ -469,11 +477,12 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) now := time.Now() deadline := whenSucceeded.Add(deletionDelay) if now.Before(deadline) { - return requeueAfter(deadline.Sub(now), r.Status().Update(ctx, aw)) + return requeueAfter(deadline.Sub(now), nil) } + orig := copyForStatusPatch(aw) if !r.deleteComponents(ctx, aw) { - return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + return requeueAfter(5*time.Second, r.Status().Patch(ctx, aw, client.MergeFrom(orig))) } meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{ Type: string(workloadv1beta2.ResourcesDeployed), @@ -481,7 +490,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) Reason: string(workloadv1beta2.AppWrapperSucceeded), Message: fmt.Sprintf("Time to live after success of %v expired", deletionDelay), }) - return ctrl.Result{}, r.Status().Update(ctx, aw) + return ctrl.Result{}, r.Status().Patch(ctx, aw, client.MergeFrom(orig)) } return ctrl.Result{}, nil } @@ -489,22 +498,22 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } -func (r *AppWrapperReconciler) updateStatus(ctx context.Context, aw *workloadv1beta2.AppWrapper, phase workloadv1beta2.AppWrapperPhase) (ctrl.Result, error) { - aw.Status.Phase = phase - if err := r.Status().Update(ctx, aw); err != nil { - return ctrl.Result{}, err +func (r *AppWrapperReconciler) transitionToPhase(ctx context.Context, orig *workloadv1beta2.AppWrapper, modified *workloadv1beta2.AppWrapper, phase workloadv1beta2.AppWrapperPhase) error { + modified.Status.Phase = phase + if err := r.Status().Patch(ctx, modified, client.MergeFrom(orig)); err != nil { + return err } log.FromContext(ctx).Info(string(phase), "phase", phase) - return ctrl.Result{}, nil + return nil } -func (r *AppWrapperReconciler) resetOrFail(ctx context.Context, aw *workloadv1beta2.AppWrapper, terminalFailure bool, retryIncrement int32) (ctrl.Result, error) { +func (r *AppWrapperReconciler) resetOrFail(ctx context.Context, orig *workloadv1beta2.AppWrapper, aw *workloadv1beta2.AppWrapper, terminalFailure bool, retryIncrement int32) error { maxRetries := r.retryLimit(ctx, aw) if !terminalFailure && aw.Status.Retries < maxRetries { aw.Status.Retries += retryIncrement - return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperResetting) + return r.transitionToPhase(ctx, orig, aw, workloadv1beta2.AppWrapperResetting) } else { - return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperFailed) + return r.transitionToPhase(ctx, orig, aw, workloadv1beta2.AppWrapperFailed) } } @@ -897,6 +906,16 @@ func (r *AppWrapperReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } +// copyForStatusPatch returns an AppWrapper with an empty Spec and a DeepCopy of orig's Status for use in a subsequent Status().Patch(...) call +func copyForStatusPatch(orig *workloadv1beta2.AppWrapper) *workloadv1beta2.AppWrapper { + copy := workloadv1beta2.AppWrapper{ + TypeMeta: orig.TypeMeta, + ObjectMeta: orig.ObjectMeta, + Status: *orig.Status.DeepCopy(), + } + return © +} + // requeueAfter requeues the request after the specified duration func requeueAfter(duration time.Duration, err error) (ctrl.Result, error) { if err != nil { diff --git a/internal/controller/appwrapper/resource_management.go b/internal/controller/appwrapper/resource_management.go index b3b6562..f6e596a 100644 --- a/internal/controller/appwrapper/resource_management.go +++ b/internal/controller/appwrapper/resource_management.go @@ -279,6 +279,7 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload return err, true } + orig := copyForStatusPatch(aw) if meta.FindStatusCondition(aw.Status.ComponentStatus[componentIdx].Conditions, string(workloadv1beta2.ResourcesDeployed)) == nil { aw.Status.ComponentStatus[componentIdx].Name = obj.GetName() aw.Status.ComponentStatus[componentIdx].Kind = obj.GetKind() @@ -288,7 +289,7 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload Status: metav1.ConditionUnknown, Reason: "ComponentCreationInitiated", }) - if err := r.Status().Update(ctx, aw); err != nil { + if err := r.Status().Patch(ctx, aw, client.MergeFrom(orig)); err != nil { return err, false } } @@ -309,19 +310,21 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload } } + orig = copyForStatusPatch(aw) aw.Status.ComponentStatus[componentIdx].Name = obj.GetName() // Update name to support usage of GenerateName meta.SetStatusCondition(&aw.Status.ComponentStatus[componentIdx].Conditions, metav1.Condition{ Type: string(workloadv1beta2.ResourcesDeployed), Status: metav1.ConditionTrue, Reason: "ComponentCreatedSuccessfully", }) - if err := r.Status().Update(ctx, aw); err != nil { + if err := r.Status().Patch(ctx, aw, client.MergeFrom(orig)); err != nil { return err, false } return nil, false } +// createComponents incrementally patches aw.Status -- MUST NOT CARRY STATUS PATCHES ACROSS INVOCATIONS func (r *AppWrapperReconciler) createComponents(ctx context.Context, aw *workloadv1beta2.AppWrapper) (error, bool) { for componentIdx := range aw.Spec.Components { if !meta.IsStatusConditionTrue(aw.Status.ComponentStatus[componentIdx].Conditions, string(workloadv1beta2.ResourcesDeployed)) {