Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(controller): Cron re-apply update #3883

Merged
merged 4 commits into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func (woc *cronWfOperationCtx) validateCronWorkflow() error {
woc.reportCronWorkflowError(v1alpha1.ConditionTypeSpecError, fmt.Sprint(err))
} else {
woc.cronWf.Status.Conditions.RemoveCondition(v1alpha1.ConditionTypeSpecError)
woc.persistUpdate()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the bug I think, cannot reapplyUpdates twice

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

}
return err
}
Expand All @@ -117,58 +116,68 @@ func getWorkflowObjectReference(wf *v1alpha1.Workflow, runWf *v1alpha1.Workflow)
}

func (woc *cronWfOperationCtx) persistUpdate() {
_, err := woc.cronWfIf.Update(woc.cronWf)
if woc.origCronWf.ResourceVersion != woc.cronWf.ResourceVersion {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a guard-rail

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might also not be necessary depending on https://github.com/argoproj/argo/pull/3883/files#r480375881

woc.log.Error("cannot update cron workflow with mismatched resource versions")
return
}
cronWf, err := woc.cronWfIf.Update(woc.cronWf)
if err != nil {
if errors.IsConflict(err) {
reapplyErr := woc.reapplyUpdate()
if reapplyErr != nil {
woc.log.WithError(reapplyErr).WithField("original error", err).Error("failed to update CronWorkflow after reapply attempt")
}
} else {
if !errors.IsConflict(err) {
woc.log.WithError(err).Error("failed to update CronWorkflow")
return
}
var reapplyErr error
cronWf, reapplyErr = woc.reapplyUpdate()
if err != nil {
woc.log.WithError(reapplyErr).WithField("original error", err).Error("failed to update CronWorkflow after reapply attempt")
return
}
}
woc.cronWf = cronWf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm not too sure what the purpose of this is. In theory, once persistUpdates is called, it should be the end of the cron operation context and nothing else should access woc.cronWf. Could you explain a bit why this line is necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've included this so that subsequent calls to this func will have the updated version, and as a result cannot accidentally reapplyUpdate twice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've included this so that subsequent calls to this func will have the updated version, and as a result cannot accidentally reapplyUpdate twice.

In that case wouldn't the guard rail here (#3883 (comment)) always fail? The resource version of the new woc.cronWf will always be different than the old.

It seems like we don't intend to call persistUpdates more than once at all. Should we instead ensure that it doesn't happen altogether?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My goals to make sure we don't regress by putting in place checks. I would want the guard condition to fail - as it indicates a programming error.

}

func (woc *cronWfOperationCtx) reapplyUpdate() error {
func (woc *cronWfOperationCtx) reapplyUpdate() (*v1alpha1.CronWorkflow, error) {
if woc.origCronWf.ResourceVersion != woc.cronWf.ResourceVersion {
return nil, fmt.Errorf("cannot re-apply cron workflow update with mismatched resource versions")
}
orig, err := json.Marshal(woc.origCronWf)
if err != nil {
return err
return nil, err
}
curr, err := json.Marshal(woc.cronWf)
if err != nil {
return err
return nil, err
}
patch, err := jsonpatch.CreateMergePatch(orig, curr)
if err != nil {
return err
return nil, err
}
attempts := 0
for {
currCronWf, err := woc.cronWfIf.Get(woc.name, v1.GetOptions{})
if err != nil {
return err
return nil, err
}
currCronWfBytes, err := json.Marshal(currCronWf)
if err != nil {
return err
return nil, err
}
newCronWfBytes, err := jsonpatch.MergePatch(currCronWfBytes, patch)
if err != nil {
return err
return nil, err
}
var newCronWf v1alpha1.CronWorkflow
err = json.Unmarshal(newCronWfBytes, &newCronWf)
if err != nil {
return err
return nil, err
}
_, err = woc.cronWfIf.Update(&newCronWf)
cronWf, err := woc.cronWfIf.Update(&newCronWf)
if err == nil {
return nil
return cronWf, nil
}
attempts++
if attempts == 5 {
return fmt.Errorf("ran out of retries when trying to reapply update: %s", err)
return nil, fmt.Errorf("ran out of retries when trying to reapply update: %s", err)
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion workflow/cron/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func TestCronWorkflowConditionSubmissionError(t *testing.T) {
wfClient: cs.ArgoprojV1alpha1().Workflows(""),
cronWfIf: cs.ArgoprojV1alpha1().CronWorkflows(""),
wfLister: &fakeLister{},
origCronWf: cronWf.DeepCopy(),
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
metrics: testMetrics,
Expand Down Expand Up @@ -230,6 +231,7 @@ func TestSpecError(t *testing.T) {
wfClient: cs.ArgoprojV1alpha1().Workflows(""),
cronWfIf: cs.ArgoprojV1alpha1().CronWorkflows(""),
wfLister: &fakeLister{},
origCronWf: cronWf.DeepCopy(),
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
metrics: testMetrics,
Expand Down Expand Up @@ -265,7 +267,7 @@ func TestReapplyUpdate(t *testing.T) {
}

cronWf.Spec.Schedule = "1 * * * *"
err := woc.reapplyUpdate()
_, err := woc.reapplyUpdate()
if assert.NoError(t, err) {
updatedCronWf, err := woc.cronWfIf.Get("my-wf", v1.GetOptions{})
if assert.NoError(t, err) {
Expand Down