Skip to content

Commit

Permalink
fix(controller): Cron re-apply update (#3883)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec authored Aug 31, 2020
1 parent fd3fca8 commit 5b5d235
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 20 deletions.
47 changes: 28 additions & 19 deletions workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func (woc *cronWfOperationCtx) validateCronWorkflow() error {
woc.reportCronWorkflowError(v1alpha1.ConditionTypeSpecError, fmt.Sprint(err))
} else {
woc.cronWf.Status.Conditions.RemoveCondition(v1alpha1.ConditionTypeSpecError)
woc.persistUpdate()
}
return err
}
Expand All @@ -117,58 +116,68 @@ func getWorkflowObjectReference(wf *v1alpha1.Workflow, runWf *v1alpha1.Workflow)
}

func (woc *cronWfOperationCtx) persistUpdate() {
_, err := woc.cronWfIf.Update(woc.cronWf)
if woc.origCronWf.ResourceVersion != woc.cronWf.ResourceVersion {
woc.log.Error("cannot update cron workflow with mismatched resource versions")
return
}
cronWf, err := woc.cronWfIf.Update(woc.cronWf)
if err != nil {
if errors.IsConflict(err) {
reapplyErr := woc.reapplyUpdate()
if reapplyErr != nil {
woc.log.WithError(reapplyErr).WithField("original error", err).Error("failed to update CronWorkflow after reapply attempt")
}
} else {
if !errors.IsConflict(err) {
woc.log.WithError(err).Error("failed to update CronWorkflow")
return
}
var reapplyErr error
cronWf, reapplyErr = woc.reapplyUpdate()
if err != nil {
woc.log.WithError(reapplyErr).WithField("original error", err).Error("failed to update CronWorkflow after reapply attempt")
return
}
}
woc.cronWf = cronWf
}

func (woc *cronWfOperationCtx) reapplyUpdate() error {
func (woc *cronWfOperationCtx) reapplyUpdate() (*v1alpha1.CronWorkflow, error) {
if woc.origCronWf.ResourceVersion != woc.cronWf.ResourceVersion {
return nil, fmt.Errorf("cannot re-apply cron workflow update with mismatched resource versions")
}
orig, err := json.Marshal(woc.origCronWf)
if err != nil {
return err
return nil, err
}
curr, err := json.Marshal(woc.cronWf)
if err != nil {
return err
return nil, err
}
patch, err := jsonpatch.CreateMergePatch(orig, curr)
if err != nil {
return err
return nil, err
}
attempts := 0
for {
currCronWf, err := woc.cronWfIf.Get(woc.name, v1.GetOptions{})
if err != nil {
return err
return nil, err
}
currCronWfBytes, err := json.Marshal(currCronWf)
if err != nil {
return err
return nil, err
}
newCronWfBytes, err := jsonpatch.MergePatch(currCronWfBytes, patch)
if err != nil {
return err
return nil, err
}
var newCronWf v1alpha1.CronWorkflow
err = json.Unmarshal(newCronWfBytes, &newCronWf)
if err != nil {
return err
return nil, err
}
_, err = woc.cronWfIf.Update(&newCronWf)
cronWf, err := woc.cronWfIf.Update(&newCronWf)
if err == nil {
return nil
return cronWf, nil
}
attempts++
if attempts == 5 {
return fmt.Errorf("ran out of retries when trying to reapply update: %s", err)
return nil, fmt.Errorf("ran out of retries when trying to reapply update: %s", err)
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion workflow/cron/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func TestCronWorkflowConditionSubmissionError(t *testing.T) {
wfClient: cs.ArgoprojV1alpha1().Workflows(""),
cronWfIf: cs.ArgoprojV1alpha1().CronWorkflows(""),
wfLister: &fakeLister{},
origCronWf: cronWf.DeepCopy(),
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
metrics: testMetrics,
Expand Down Expand Up @@ -230,6 +231,7 @@ func TestSpecError(t *testing.T) {
wfClient: cs.ArgoprojV1alpha1().Workflows(""),
cronWfIf: cs.ArgoprojV1alpha1().CronWorkflows(""),
wfLister: &fakeLister{},
origCronWf: cronWf.DeepCopy(),
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
metrics: testMetrics,
Expand Down Expand Up @@ -265,7 +267,7 @@ func TestReapplyUpdate(t *testing.T) {
}

cronWf.Spec.Schedule = "1 * * * *"
err := woc.reapplyUpdate()
_, err := woc.reapplyUpdate()
if assert.NoError(t, err) {
updatedCronWf, err := woc.cronWfIf.Get("my-wf", v1.GetOptions{})
if assert.NoError(t, err) {
Expand Down

0 comments on commit 5b5d235

Please sign in to comment.