diff --git a/workflow/cron/operator.go b/workflow/cron/operator.go index b45ecdae90c5..33056db9ab2a 100644 --- a/workflow/cron/operator.go +++ b/workflow/cron/operator.go @@ -98,7 +98,6 @@ func (woc *cronWfOperationCtx) validateCronWorkflow() error { woc.reportCronWorkflowError(v1alpha1.ConditionTypeSpecError, fmt.Sprint(err)) } else { woc.cronWf.Status.Conditions.RemoveCondition(v1alpha1.ConditionTypeSpecError) - woc.persistUpdate() } return err } @@ -117,58 +116,68 @@ func getWorkflowObjectReference(wf *v1alpha1.Workflow, runWf *v1alpha1.Workflow) } func (woc *cronWfOperationCtx) persistUpdate() { - _, err := woc.cronWfIf.Update(woc.cronWf) + if woc.origCronWf.ResourceVersion != woc.cronWf.ResourceVersion { + woc.log.Error("cannot update cron workflow with mismatched resource versions") + return + } + cronWf, err := woc.cronWfIf.Update(woc.cronWf) if err != nil { - if errors.IsConflict(err) { - reapplyErr := woc.reapplyUpdate() - if reapplyErr != nil { - woc.log.WithError(reapplyErr).WithField("original error", err).Error("failed to update CronWorkflow after reapply attempt") - } - } else { + if !errors.IsConflict(err) { woc.log.WithError(err).Error("failed to update CronWorkflow") + return + } + var reapplyErr error + cronWf, reapplyErr = woc.reapplyUpdate() + if err != nil { + woc.log.WithError(reapplyErr).WithField("original error", err).Error("failed to update CronWorkflow after reapply attempt") + return } } + woc.cronWf = cronWf } -func (woc *cronWfOperationCtx) reapplyUpdate() error { +func (woc *cronWfOperationCtx) reapplyUpdate() (*v1alpha1.CronWorkflow, error) { + if woc.origCronWf.ResourceVersion != woc.cronWf.ResourceVersion { + return nil, fmt.Errorf("cannot re-apply cron workflow update with mismatched resource versions") + } orig, err := json.Marshal(woc.origCronWf) if err != nil { - return err + return nil, err } curr, err := json.Marshal(woc.cronWf) if err != nil { - return err + return nil, err } patch, err := jsonpatch.CreateMergePatch(orig, curr) if err != nil { - return err + return nil, err } attempts := 0 for { currCronWf, err := woc.cronWfIf.Get(woc.name, v1.GetOptions{}) if err != nil { - return err + return nil, err } currCronWfBytes, err := json.Marshal(currCronWf) if err != nil { - return err + return nil, err } newCronWfBytes, err := jsonpatch.MergePatch(currCronWfBytes, patch) if err != nil { - return err + return nil, err } var newCronWf v1alpha1.CronWorkflow err = json.Unmarshal(newCronWfBytes, &newCronWf) if err != nil { - return err + return nil, err } - _, err = woc.cronWfIf.Update(&newCronWf) + cronWf, err := woc.cronWfIf.Update(&newCronWf) if err == nil { - return nil + return cronWf, nil } attempts++ if attempts == 5 { - return fmt.Errorf("ran out of retries when trying to reapply update: %s", err) + return nil, fmt.Errorf("ran out of retries when trying to reapply update: %s", err) } } } diff --git a/workflow/cron/operator_test.go b/workflow/cron/operator_test.go index b27640e692eb..8fc69ab44ab1 100644 --- a/workflow/cron/operator_test.go +++ b/workflow/cron/operator_test.go @@ -174,6 +174,7 @@ func TestCronWorkflowConditionSubmissionError(t *testing.T) { wfClient: cs.ArgoprojV1alpha1().Workflows(""), cronWfIf: cs.ArgoprojV1alpha1().CronWorkflows(""), wfLister: &fakeLister{}, + origCronWf: cronWf.DeepCopy(), cronWf: &cronWf, log: logrus.WithFields(logrus.Fields{}), metrics: testMetrics, @@ -230,6 +231,7 @@ func TestSpecError(t *testing.T) { wfClient: cs.ArgoprojV1alpha1().Workflows(""), cronWfIf: cs.ArgoprojV1alpha1().CronWorkflows(""), wfLister: &fakeLister{}, + origCronWf: cronWf.DeepCopy(), cronWf: &cronWf, log: logrus.WithFields(logrus.Fields{}), metrics: testMetrics, @@ -265,7 +267,7 @@ func TestReapplyUpdate(t *testing.T) { } cronWf.Spec.Schedule = "1 * * * *" - err := woc.reapplyUpdate() + _, err := woc.reapplyUpdate() if assert.NoError(t, err) { updatedCronWf, err := woc.cronWfIf.Get("my-wf", v1.GetOptions{}) if assert.NoError(t, err) {