Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support parameter substitution in the volumes attribute #1238

Merged
merged 2 commits into from
Apr 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,13 @@ func (woc *wfOperationCtx) resolveDependencyReferences(dagCtx *dagContext, task
}

// Perform replacement
// Replace woc.volumes
err := woc.substituteParamsInVolumes(scope.replaceMap())
if err != nil {
return nil, err
}

// Replace task's parameters
taskBytes, err := json.Marshal(task)
if err != nil {
return nil, errors.InternalWrapError(err)
Expand Down
37 changes: 36 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type wfOperationCtx struct {
// globalParams holds any parameters that are available to be referenced
// in the global scope (e.g. workflow.parameters.XXX).
globalParams map[string]string
// volumes holds a DeepCopy of wf.Spec.Volumes to perform substitutions.
// It is then used in addVolumeReferences() when creating a pod.
volumes []apiv1.Volume
// map of pods which need to be labeled with completed=true
completedPods map[string]bool
// deadline is the dealine time in which this operation should relinquish
Expand Down Expand Up @@ -93,6 +96,7 @@ func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOper
}),
controller: wfc,
globalParams: make(map[string]string),
volumes: wf.Spec.DeepCopy().Volumes,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make a new field when you can just access woc.wf.Spec.Volumes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was my misunderstanding as well, but @jessesuen explained it to me (the conversation is lost due to my force pushes).

We need to copy this field (or any field we need to modify) because woc.wf is always the value in the etcd. Thus, there is no reproducibility.
If we use woc.wf.Spec.Volumes and the user wants to let's say retry the wf, the {{...}} strings will be substituted the first time the wf was run, so the behavior will not be as desired.

completedPods: make(map[string]bool),
deadline: time.Now().UTC().Add(maxOperationTime),
}
Expand Down Expand Up @@ -158,7 +162,14 @@ func (woc *wfOperationCtx) operate() {

woc.setGlobalParameters()

err := woc.createPVCs()
err := woc.substituteParamsInVolumes(woc.globalParams)
if err != nil {
woc.log.Errorf("%s volumes global param substitution error: %+v", woc.wf.ObjectMeta.Name, err)
woc.markWorkflowError(err, true)
return
}

err = woc.createPVCs()
if err != nil {
woc.log.Errorf("%s pvc create error: %+v", woc.wf.ObjectMeta.Name, err)
woc.markWorkflowError(err, true)
Expand Down Expand Up @@ -1667,3 +1678,27 @@ func (woc *wfOperationCtx) checkAndCompress() error {

return nil
}

func (woc *wfOperationCtx) substituteParamsInVolumes(params map[string]string) error {
if woc.volumes == nil {
return nil
}

volumes := woc.volumes
volumesBytes, err := json.Marshal(volumes)
if err != nil {
return errors.InternalWrapError(err)
}
fstTmpl := fasttemplate.New(string(volumesBytes), "{{", "}}")
newVolumesStr, err := common.Replace(fstTmpl, params, true)
if err != nil {
return err
}
var newVolumes []apiv1.Volume
err = json.Unmarshal([]byte(newVolumesStr), &newVolumes)
if err != nil {
return errors.InternalWrapError(err)
}
woc.volumes = newVolumes
return nil
}
15 changes: 7 additions & 8 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,22 +278,21 @@ func shouldExecute(when string) (bool, error) {
func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scope *wfScope) ([]wfv1.WorkflowStep, error) {
newStepGroup := make([]wfv1.WorkflowStep, len(stepGroup))

// Step 0: replace all parameter scope references for volumes
err := woc.substituteParamsInVolumes(scope.replaceMap())
if err != nil {
return nil, err
}

for i, step := range stepGroup {
// Step 1: replace all parameter scope references in the step
// TODO: improve this
stepBytes, err := json.Marshal(step)
if err != nil {
return nil, errors.InternalWrapError(err)
}
replaceMap := make(map[string]string)
for key, val := range scope.scope {
valStr, ok := val.(string)
if ok {
replaceMap[key] = valStr
}
}
fstTmpl := fasttemplate.New(string(stepBytes), "{{", "}}")
newStepStr, err := common.Replace(fstTmpl, replaceMap, true)
newStepStr, err := common.Replace(fstTmpl, scope.replaceMap(), true)
if err != nil {
return nil, err
}
Expand Down
31 changes: 18 additions & 13 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
addSchedulingConstraints(pod, wfSpec, tmpl)
woc.addMetadata(pod, tmpl)

err = addVolumeReferences(pod, wfSpec, tmpl, woc.wf.Status.PersistentVolumeClaims)
err = addVolumeReferences(pod, woc.volumes, tmpl, woc.wf.Status.PersistentVolumeClaims)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -183,8 +183,13 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
pod.ObjectMeta.Annotations[common.AnnotationKeyTemplate] = string(tmplBytes)

// Perform one last variable substitution here. Some variables come from the from workflow
// configmap (e.g. archive location), and were not substituted in executeTemplate.
pod, err = substituteGlobals(pod, woc.globalParams)
// configmap (e.g. archive location) or volumes attribute, and were not substituted
// in executeTemplate.
podParams := woc.globalParams
for _, inParam := range tmpl.Inputs.Parameters {
podParams["inputs.parameters."+inParam.Name] = *inParam.Value
}
pod, err = substitutePodParams(pod, podParams)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -220,20 +225,20 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
return created, nil
}

// substituteGlobals returns a pod spec with global parameter references substituted as well as pod.name
func substituteGlobals(pod *apiv1.Pod, globalParams map[string]string) (*apiv1.Pod, error) {
newGlobalParams := make(map[string]string)
for k, v := range globalParams {
newGlobalParams[k] = v
// substitutePodParams returns a pod spec with parameter references substituted as well as pod.name
func substitutePodParams(pod *apiv1.Pod, podParams map[string]string) (*apiv1.Pod, error) {
newPodParams := make(map[string]string)
for k, v := range podParams {
newPodParams[k] = v
}
newGlobalParams[common.LocalVarPodName] = pod.Name
globalParams = newGlobalParams
newPodParams[common.LocalVarPodName] = pod.Name
podParams = newPodParams
specBytes, err := json.Marshal(pod)
if err != nil {
return nil, err
}
fstTmpl := fasttemplate.New(string(specBytes), "{{", "}}")
newSpecBytes, err := common.Replace(fstTmpl, globalParams, true)
newSpecBytes, err := common.Replace(fstTmpl, podParams, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -455,7 +460,7 @@ func addSchedulingConstraints(pod *apiv1.Pod, wfSpec *wfv1.WorkflowSpec, tmpl *w

// addVolumeReferences adds any volumeMounts that a container/sidecar is referencing, to the pod.spec.volumes
// These are either specified in the workflow.spec.volumes or the workflow.spec.volumeClaimTemplate section
func addVolumeReferences(pod *apiv1.Pod, wfSpec *wfv1.WorkflowSpec, tmpl *wfv1.Template, pvcs []apiv1.Volume) error {
func addVolumeReferences(pod *apiv1.Pod, vols []apiv1.Volume, tmpl *wfv1.Template, pvcs []apiv1.Volume) error {
switch tmpl.GetType() {
case wfv1.TemplateTypeContainer, wfv1.TemplateTypeScript:
default:
Expand All @@ -464,7 +469,7 @@ func addVolumeReferences(pod *apiv1.Pod, wfSpec *wfv1.WorkflowSpec, tmpl *wfv1.T

// getVolByName is a helper to retrieve a volume by its name, either from the volumes or claims section
getVolByName := func(name string) *apiv1.Volume {
for _, vol := range wfSpec.Volumes {
for _, vol := range vols {
if vol.Name == name {
return &vol
}
Expand Down
52 changes: 47 additions & 5 deletions workflow/controller/workflowpod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func TestVolumeAndVolumeMounts(t *testing.T) {
// For Docker executor
{
woc := newWoc()
woc.wf.Spec.Volumes = volumes
woc.volumes = volumes
woc.wf.Spec.Templates[0].Container.VolumeMounts = volumeMounts
woc.controller.Config.ContainerRuntimeExecutor = common.ContainerRuntimeExecutorDocker

Expand All @@ -291,7 +291,7 @@ func TestVolumeAndVolumeMounts(t *testing.T) {
// For Kubelet executor
{
woc := newWoc()
woc.wf.Spec.Volumes = volumes
woc.volumes = volumes
woc.wf.Spec.Templates[0].Container.VolumeMounts = volumeMounts
woc.controller.Config.ContainerRuntimeExecutor = common.ContainerRuntimeExecutorKubelet

Expand All @@ -309,7 +309,7 @@ func TestVolumeAndVolumeMounts(t *testing.T) {
// For K8sAPI executor
{
woc := newWoc()
woc.wf.Spec.Volumes = volumes
woc.volumes = volumes
woc.wf.Spec.Templates[0].Container.VolumeMounts = volumeMounts
woc.controller.Config.ContainerRuntimeExecutor = common.ContainerRuntimeExecutorK8sAPI

Expand All @@ -325,6 +325,48 @@ func TestVolumeAndVolumeMounts(t *testing.T) {
}
}

func TestVolumesPodSubstitution(t *testing.T) {
volumes := []apiv1.Volume{
{
Name: "volume-name",
VolumeSource: apiv1.VolumeSource{
PersistentVolumeClaim: &apiv1.PersistentVolumeClaimVolumeSource{
ClaimName: "{{inputs.parameters.volume-name}}",
},
},
},
}
volumeMounts := []apiv1.VolumeMount{
{
Name: "volume-name",
MountPath: "/test",
},
}
tmpStr := "test-name"
inputParameters := []wfv1.Parameter{
{
Name: "volume-name",
Value: &tmpStr,
},
}

woc := newWoc()
woc.volumes = volumes
woc.wf.Spec.Templates[0].Container.VolumeMounts = volumeMounts
woc.wf.Spec.Templates[0].Inputs.Parameters = inputParameters
woc.controller.Config.ContainerRuntimeExecutor = common.ContainerRuntimeExecutorDocker

woc.executeContainer(woc.wf.Spec.Entrypoint, &woc.wf.Spec.Templates[0], "")
podName := getPodName(woc.wf)
pod, err := woc.controller.kubeclientset.CoreV1().Pods("").Get(podName, metav1.GetOptions{})
assert.Nil(t, err)
assert.Equal(t, 3, len(pod.Spec.Volumes))
assert.Equal(t, "volume-name", pod.Spec.Volumes[2].Name)
assert.Equal(t, "test-name", pod.Spec.Volumes[2].PersistentVolumeClaim.ClaimName)
assert.Equal(t, 1, len(pod.Spec.Containers[1].VolumeMounts))
assert.Equal(t, "volume-name", pod.Spec.Containers[1].VolumeMounts[0].Name)
}

func TestOutOfCluster(t *testing.T) {

verifyKubeConfigVolume := func(ctr apiv1.Container, volName, mountPath string) {
Expand Down Expand Up @@ -428,7 +470,7 @@ func TestInitContainers(t *testing.T) {
mirrorVolumeMounts := true

woc := newWoc()
woc.wf.Spec.Volumes = volumes
woc.volumes = volumes
woc.wf.Spec.Templates[0].Container.VolumeMounts = volumeMounts
woc.wf.Spec.Templates[0].InitContainers = []wfv1.UserContainer{
{
Expand Down Expand Up @@ -466,7 +508,7 @@ func TestSidecars(t *testing.T) {
mirrorVolumeMounts := true

woc := newWoc()
woc.wf.Spec.Volumes = volumes
woc.volumes = volumes
woc.wf.Spec.Templates[0].Container.VolumeMounts = volumeMounts
woc.wf.Spec.Templates[0].Sidecars = []wfv1.UserContainer{
{
Expand Down