From 7dfa71b94241e543b30e197058e6c3c33db50c87 Mon Sep 17 00:00:00 2001 From: Ilias Katsakioris Date: Sat, 13 Apr 2019 00:19:15 +0300 Subject: [PATCH 1/2] Do parameter substitution in the `Volumes` attribute * Add `woc.volume` field, which is a DeepCopy of `wf.Spec.Volumes` * Use `woc.volume` instead of `wfSpec.Volumes` in workflowpod.go * Substitute output parameters in the `volumes` attribute everywhere where Argo already performed substitution. This includes `woc.resolveReferences()` and `woc.resolveDependencyReferences()`. * Perform global parameter substitution in the `volumes` attribute once the global parameters are set, too. * Fix volume tests to fill `woc.volumes` instead of `wf.Spec.Volumes` * Replace code with `scope.ReplaceMap()` in `woc.resolveReferences()` Signed-off-by: Ilias Katsakioris --- workflow/controller/dag.go | 7 +++++ workflow/controller/operator.go | 37 ++++++++++++++++++++++++- workflow/controller/steps.go | 15 +++++----- workflow/controller/workflowpod.go | 6 ++-- workflow/controller/workflowpod_test.go | 10 +++---- 5 files changed, 58 insertions(+), 17 deletions(-) diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index 1a3f5cb40ccc..6ebf1d3f2c76 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -380,6 +380,13 @@ func (woc *wfOperationCtx) resolveDependencyReferences(dagCtx *dagContext, task } // Perform replacement + // Replace woc.volumes + err := woc.substituteParamsInVolumes(scope.replaceMap()) + if err != nil { + return nil, err + } + + // Replace task's parameters taskBytes, err := json.Marshal(task) if err != nil { return nil, errors.InternalWrapError(err) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 60ba3c4379c4..3e14d1b7611b 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -49,6 +49,9 @@ type wfOperationCtx struct { // globalParams holds any parameters that are available to be referenced // in the global scope (e.g. workflow.parameters.XXX). globalParams map[string]string + // volumes holds a DeepCopy of wf.Spec.Volumes to perform substitutions. + // It is then used in addVolumeReferences() when creating a pod. + volumes []apiv1.Volume // map of pods which need to be labeled with completed=true completedPods map[string]bool // deadline is the dealine time in which this operation should relinquish @@ -93,6 +96,7 @@ func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOper }), controller: wfc, globalParams: make(map[string]string), + volumes: wf.Spec.DeepCopy().Volumes, completedPods: make(map[string]bool), deadline: time.Now().UTC().Add(maxOperationTime), } @@ -158,7 +162,14 @@ func (woc *wfOperationCtx) operate() { woc.setGlobalParameters() - err := woc.createPVCs() + err := woc.substituteParamsInVolumes(woc.globalParams) + if err != nil { + woc.log.Errorf("%s volumes global param substitution error: %+v", woc.wf.ObjectMeta.Name, err) + woc.markWorkflowError(err, true) + return + } + + err = woc.createPVCs() if err != nil { woc.log.Errorf("%s pvc create error: %+v", woc.wf.ObjectMeta.Name, err) woc.markWorkflowError(err, true) @@ -1667,3 +1678,27 @@ func (woc *wfOperationCtx) checkAndCompress() error { return nil } + +func (woc *wfOperationCtx) substituteParamsInVolumes(params map[string]string) error { + if woc.volumes == nil { + return nil + } + + volumes := woc.volumes + volumesBytes, err := json.Marshal(volumes) + if err != nil { + return errors.InternalWrapError(err) + } + fstTmpl := fasttemplate.New(string(volumesBytes), "{{", "}}") + newVolumesStr, err := common.Replace(fstTmpl, params, true) + if err != nil { + return err + } + var newVolumes []apiv1.Volume + err = json.Unmarshal([]byte(newVolumesStr), &newVolumes) + if err != nil { + return errors.InternalWrapError(err) + } + woc.volumes = newVolumes + return nil +} diff --git a/workflow/controller/steps.go b/workflow/controller/steps.go index e54ae2bb852a..9e55c88d3911 100644 --- a/workflow/controller/steps.go +++ b/workflow/controller/steps.go @@ -278,6 +278,12 @@ func shouldExecute(when string) (bool, error) { func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scope *wfScope) ([]wfv1.WorkflowStep, error) { newStepGroup := make([]wfv1.WorkflowStep, len(stepGroup)) + // Step 0: replace all parameter scope references for volumes + err := woc.substituteParamsInVolumes(scope.replaceMap()) + if err != nil { + return nil, err + } + for i, step := range stepGroup { // Step 1: replace all parameter scope references in the step // TODO: improve this @@ -285,15 +291,8 @@ func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scop if err != nil { return nil, errors.InternalWrapError(err) } - replaceMap := make(map[string]string) - for key, val := range scope.scope { - valStr, ok := val.(string) - if ok { - replaceMap[key] = valStr - } - } fstTmpl := fasttemplate.New(string(stepBytes), "{{", "}}") - newStepStr, err := common.Replace(fstTmpl, replaceMap, true) + newStepStr, err := common.Replace(fstTmpl, scope.replaceMap(), true) if err != nil { return nil, err } diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 04872ece41c2..e815502fdcef 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -148,7 +148,7 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont addSchedulingConstraints(pod, wfSpec, tmpl) woc.addMetadata(pod, tmpl) - err = addVolumeReferences(pod, wfSpec, tmpl, woc.wf.Status.PersistentVolumeClaims) + err = addVolumeReferences(pod, woc.volumes, tmpl, woc.wf.Status.PersistentVolumeClaims) if err != nil { return nil, err } @@ -455,7 +455,7 @@ func addSchedulingConstraints(pod *apiv1.Pod, wfSpec *wfv1.WorkflowSpec, tmpl *w // addVolumeReferences adds any volumeMounts that a container/sidecar is referencing, to the pod.spec.volumes // These are either specified in the workflow.spec.volumes or the workflow.spec.volumeClaimTemplate section -func addVolumeReferences(pod *apiv1.Pod, wfSpec *wfv1.WorkflowSpec, tmpl *wfv1.Template, pvcs []apiv1.Volume) error { +func addVolumeReferences(pod *apiv1.Pod, vols []apiv1.Volume, tmpl *wfv1.Template, pvcs []apiv1.Volume) error { switch tmpl.GetType() { case wfv1.TemplateTypeContainer, wfv1.TemplateTypeScript: default: @@ -464,7 +464,7 @@ func addVolumeReferences(pod *apiv1.Pod, wfSpec *wfv1.WorkflowSpec, tmpl *wfv1.T // getVolByName is a helper to retrieve a volume by its name, either from the volumes or claims section getVolByName := func(name string) *apiv1.Volume { - for _, vol := range wfSpec.Volumes { + for _, vol := range vols { if vol.Name == name { return &vol } diff --git a/workflow/controller/workflowpod_test.go b/workflow/controller/workflowpod_test.go index 190c48d42c1c..61dda367ccb1 100644 --- a/workflow/controller/workflowpod_test.go +++ b/workflow/controller/workflowpod_test.go @@ -272,7 +272,7 @@ func TestVolumeAndVolumeMounts(t *testing.T) { // For Docker executor { woc := newWoc() - woc.wf.Spec.Volumes = volumes + woc.volumes = volumes woc.wf.Spec.Templates[0].Container.VolumeMounts = volumeMounts woc.controller.Config.ContainerRuntimeExecutor = common.ContainerRuntimeExecutorDocker @@ -291,7 +291,7 @@ func TestVolumeAndVolumeMounts(t *testing.T) { // For Kubelet executor { woc := newWoc() - woc.wf.Spec.Volumes = volumes + woc.volumes = volumes woc.wf.Spec.Templates[0].Container.VolumeMounts = volumeMounts woc.controller.Config.ContainerRuntimeExecutor = common.ContainerRuntimeExecutorKubelet @@ -309,7 +309,7 @@ func TestVolumeAndVolumeMounts(t *testing.T) { // For K8sAPI executor { woc := newWoc() - woc.wf.Spec.Volumes = volumes + woc.volumes = volumes woc.wf.Spec.Templates[0].Container.VolumeMounts = volumeMounts woc.controller.Config.ContainerRuntimeExecutor = common.ContainerRuntimeExecutorK8sAPI @@ -428,7 +428,7 @@ func TestInitContainers(t *testing.T) { mirrorVolumeMounts := true woc := newWoc() - woc.wf.Spec.Volumes = volumes + woc.volumes = volumes woc.wf.Spec.Templates[0].Container.VolumeMounts = volumeMounts woc.wf.Spec.Templates[0].InitContainers = []wfv1.UserContainer{ { @@ -466,7 +466,7 @@ func TestSidecars(t *testing.T) { mirrorVolumeMounts := true woc := newWoc() - woc.wf.Spec.Volumes = volumes + woc.volumes = volumes woc.wf.Spec.Templates[0].Container.VolumeMounts = volumeMounts woc.wf.Spec.Templates[0].Sidecars = []wfv1.UserContainer{ { From 42ecec250384f0b2490d999f3c69527ebeacc763 Mon Sep 17 00:00:00 2001 From: Ilias Katsakioris Date: Sat, 13 Apr 2019 00:19:15 +0300 Subject: [PATCH 2/2] Substitute input parameters in Pod * Only global parameters were substituted in createWorkflowPod() * Do substitution of input parameters as well because a VolumeSource may be unknown and depend on an input parameter Signed-off-by: Ilias Katsakioris --- workflow/controller/workflowpod.go | 25 +++++++++------ workflow/controller/workflowpod_test.go | 42 +++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 10 deletions(-) diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index e815502fdcef..7a4d6113a7ec 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -183,8 +183,13 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont pod.ObjectMeta.Annotations[common.AnnotationKeyTemplate] = string(tmplBytes) // Perform one last variable substitution here. Some variables come from the from workflow - // configmap (e.g. archive location), and were not substituted in executeTemplate. - pod, err = substituteGlobals(pod, woc.globalParams) + // configmap (e.g. archive location) or volumes attribute, and were not substituted + // in executeTemplate. + podParams := woc.globalParams + for _, inParam := range tmpl.Inputs.Parameters { + podParams["inputs.parameters."+inParam.Name] = *inParam.Value + } + pod, err = substitutePodParams(pod, podParams) if err != nil { return nil, err } @@ -220,20 +225,20 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont return created, nil } -// substituteGlobals returns a pod spec with global parameter references substituted as well as pod.name -func substituteGlobals(pod *apiv1.Pod, globalParams map[string]string) (*apiv1.Pod, error) { - newGlobalParams := make(map[string]string) - for k, v := range globalParams { - newGlobalParams[k] = v +// substitutePodParams returns a pod spec with parameter references substituted as well as pod.name +func substitutePodParams(pod *apiv1.Pod, podParams map[string]string) (*apiv1.Pod, error) { + newPodParams := make(map[string]string) + for k, v := range podParams { + newPodParams[k] = v } - newGlobalParams[common.LocalVarPodName] = pod.Name - globalParams = newGlobalParams + newPodParams[common.LocalVarPodName] = pod.Name + podParams = newPodParams specBytes, err := json.Marshal(pod) if err != nil { return nil, err } fstTmpl := fasttemplate.New(string(specBytes), "{{", "}}") - newSpecBytes, err := common.Replace(fstTmpl, globalParams, true) + newSpecBytes, err := common.Replace(fstTmpl, podParams, true) if err != nil { return nil, err } diff --git a/workflow/controller/workflowpod_test.go b/workflow/controller/workflowpod_test.go index 61dda367ccb1..4e5efe2c52dc 100644 --- a/workflow/controller/workflowpod_test.go +++ b/workflow/controller/workflowpod_test.go @@ -325,6 +325,48 @@ func TestVolumeAndVolumeMounts(t *testing.T) { } } +func TestVolumesPodSubstitution(t *testing.T) { + volumes := []apiv1.Volume{ + { + Name: "volume-name", + VolumeSource: apiv1.VolumeSource{ + PersistentVolumeClaim: &apiv1.PersistentVolumeClaimVolumeSource{ + ClaimName: "{{inputs.parameters.volume-name}}", + }, + }, + }, + } + volumeMounts := []apiv1.VolumeMount{ + { + Name: "volume-name", + MountPath: "/test", + }, + } + tmpStr := "test-name" + inputParameters := []wfv1.Parameter{ + { + Name: "volume-name", + Value: &tmpStr, + }, + } + + woc := newWoc() + woc.volumes = volumes + woc.wf.Spec.Templates[0].Container.VolumeMounts = volumeMounts + woc.wf.Spec.Templates[0].Inputs.Parameters = inputParameters + woc.controller.Config.ContainerRuntimeExecutor = common.ContainerRuntimeExecutorDocker + + woc.executeContainer(woc.wf.Spec.Entrypoint, &woc.wf.Spec.Templates[0], "") + podName := getPodName(woc.wf) + pod, err := woc.controller.kubeclientset.CoreV1().Pods("").Get(podName, metav1.GetOptions{}) + assert.Nil(t, err) + assert.Equal(t, 3, len(pod.Spec.Volumes)) + assert.Equal(t, "volume-name", pod.Spec.Volumes[2].Name) + assert.Equal(t, "test-name", pod.Spec.Volumes[2].PersistentVolumeClaim.ClaimName) + assert.Equal(t, 1, len(pod.Spec.Containers[1].VolumeMounts)) + assert.Equal(t, "volume-name", pod.Spec.Containers[1].VolumeMounts[0].Name) +} + func TestOutOfCluster(t *testing.T) { verifyKubeConfigVolume := func(ctr apiv1.Container, volName, mountPath string) {