Skip to content

Commit

Permalink
fix(sdk): fix nested loop counter param bug (kubeflow#1080)
Browse files Browse the repository at this point in the history
* fix nested loop counter param bug

* address comments
  • Loading branch information
Tomcli authored Nov 14, 2022
1 parent 6117df5 commit 5767673
Show file tree
Hide file tree
Showing 5 changed files with 359 additions and 2 deletions.
18 changes: 16 additions & 2 deletions sdk/python/kfp_tekton/compiler/_tekton_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,10 @@ def _handle_tekton_custom_task(custom_task: dict, workflow: dict, recursive_task
custom_task_cr['spec']['iterationNumberParam']]
custom_task_cr['spec']['iterateParam'] = custom_task[custom_task_key]['loop_args']
separator = custom_task[custom_task_key].get('separator')
start_end_step_keys = ['from', 'to', 'step']
if separator is not None:
custom_task_cr['spec']['iterateParamStringSeparator'] = separator
if custom_task[custom_task_key].get('start') is not None:
start_end_step_keys = ['from', 'to', 'step']
custom_task_cr['spec']['pipelineSpec']['params'] = [value for value
in custom_task_cr['spec']['pipelineSpec']['params']
if value['name'] not in start_end_step_keys]
Expand All @@ -270,10 +270,24 @@ def _handle_tekton_custom_task(custom_task: dict, workflow: dict, recursive_task

custom_task_cr['spec']['iterateNumeric'] = custom_task_cr['spec']['iterateParam']
custom_task_cr['spec'].pop('iterateParam')
# check whether or not the nested custom task param reference need to be replaced
custom_task_param_map = {}
for custom_task_param in custom_task[custom_task_key]['spec']['params']:
if custom_task_param['name'] != custom_task[custom_task_key]['loop_args'] and '$(tasks.' in custom_task_param['value']:
custom_task_param_map.setdefault(custom_task_param['value'], []).append('$(params.%s)' % custom_task_param['name'])
for key, item in custom_task_param_map.items():
replacement_item = None
if len(item) == 1:
replacement_item = item[0]
if len(item) > 1:
forbidden_keystrings = ['$(params.%s)' % x for x in start_end_step_keys]
for i in item:
if i not in forbidden_keystrings:
replacement_item = i
break
if replacement_item:
custom_task_cr = json.loads(
json.dumps(custom_task_cr).replace(custom_task_param['value'], '$(params.%s)' % custom_task_param['name']))
json.dumps(custom_task_cr).replace(key, replacement_item))

# remove separator from CR params
if custom_task[custom_task_key].get('separator') is not None:
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ def test_recur_cond_workflow(self):
from .testdata.tekton_loop_dsl import pipeline
self._test_pipeline_workflow(pipeline, 'tekton_loop_dsl.yaml')

def test_nested_loop_counter_param_workflow(self):
"""
Test compiling a loop workflow using tekton nested loop with counter params.
"""
from .testdata.nested_loop_counter_param import output_in_range_and_pass
self._test_pipeline_workflow(output_in_range_and_pass, 'nested_loop_counter_param.yaml')

def test_nested_loop_same_arg_workflow(self):
"""
Test compiling a nested loop with same argument workflow.
Expand Down
111 changes: 111 additions & 0 deletions sdk/python/tests/compiler/testdata/nested_loop_counter_param.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright 2022 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import itertools

from kfp import dsl, components
from kfp.components import load_component_from_text
from kfp_tekton.tekton import TEKTON_CUSTOM_TASK_IMAGES, Loop
from kfp_tekton.compiler import TektonCompiler
import yaml


ARTIFACT_FETCHER_IMAGE_NAME = "fetcher/image:latest"
TEKTON_CUSTOM_TASK_IMAGES = TEKTON_CUSTOM_TASK_IMAGES.append(ARTIFACT_FETCHER_IMAGE_NAME)

_artifact_fetcher_no = 0


def artifact_fetcher(**artifact_paths: str):
'''A containerOp template resolving some artifacts, given their paths.'''
global _artifact_fetcher_no
template_yaml = {
'name': f'artifact-fetcher-{_artifact_fetcher_no}',
'description': 'Artifact Fetch',
'inputs': [
{'name': name, 'type': 'String'}
for name in artifact_paths.keys()
],
'outputs': [
{'name': name, 'type': 'Artifact'}
for name in artifact_paths.keys()
],
'implementation': {
'container': {
'image': ARTIFACT_FETCHER_IMAGE_NAME,
'command': ['sh', '-c'], # irrelevant
'args': [
'--apiVersion', 'fetcher.tekton.dev/v1alpha1',
'--kind', 'FETCHER',
'--name', 'artifact_fetcher',
*itertools.chain(*[
(f'--{name}', {'inputValue': name})
for name in artifact_paths.keys()
])
]
}
}
}
_artifact_fetcher_no += 1
template_str = yaml.dump(template_yaml, Dumper=yaml.SafeDumper)
template = components.load_component_from_text(template_str)
op = template(**artifact_paths)
op.add_pod_annotation("valid_container", "false")
return op


class Coder:
def empty(self):
return ""


TektonCompiler._get_unique_id_code = Coder.empty


def PrintOp(name: str, msg: str = None):
if msg is None:
msg = name
print_op = load_component_from_text(
"""
name: %s
inputs:
- {name: input_text, type: String, description: 'Represents an input parameter.'}
outputs:
- {name: output_value, type: String, description: 'Represents an output paramter.'}
implementation:
container:
image: alpine:3.6
command:
- sh
- -c
- |
set -e
echo $0 > $1
- {inputValue: input_text}
- {outputPath: output_value}
""" % (name)
)
return print_op(msg)


@dsl.pipeline("output_in_range_and_pass")
def output_in_range_and_pass():
op0 = PrintOp('print-0', f"Hello!")
with Loop.range(1, op0.output, step=2):
op1 = PrintOp('print-1', f"print {op0.output}")
op2 = artifact_fetcher(path=op0.output)


if __name__ == '__main__':
TektonCompiler().compile(output_in_range_and_pass, __file__.replace('.py', '.yaml'))
129 changes: 129 additions & 0 deletions sdk/python/tests/compiler/testdata/nested_loop_counter_param.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: output-in-range-and-pass
annotations:
tekton.dev/output_artifacts: '{"print-0": [{"key": "artifacts/$PIPELINERUN/print-0/output_value.tgz",
"name": "print-0-output_value", "path": "/tmp/outputs/output_value/data"}],
"print-1": [{"key": "artifacts/$PIPELINERUN/print-1/output_value.tgz", "name":
"print-1-output_value", "path": "/tmp/outputs/output_value/data"}]}'
tekton.dev/input_artifacts: '{"print-1": [{"name": "print-0-output_value", "parent_task":
"print-0"}]}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"print-0": [["output_value", "$(results.output-value.path)"]],
"print-1": [["output_value", "$(results.output-value.path)"]]}'
sidecar.istio.io/inject: "false"
tekton.dev/template: ''
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
pipelines.kubeflow.org/pipeline_spec: '{"name": "output_in_range_and_pass"}'
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
spec:
pipelineSpec:
tasks:
- name: print-0
taskSpec:
steps:
- name: main
command:
- sh
- -c
- |
set -e
echo $0 > $1
- Hello!
- $(results.output-value.path)
image: alpine:3.6
results:
- name: output-value
type: string
description: /tmp/outputs/output_value/data
metadata:
labels:
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name": "print-0", "outputs":
[{"description": "Represents an output paramter.", "name": "output_value",
"type": "String"}], "version": "print-0@sha256=5dd3c506ec54281b82008ca8ec5d8142834eae18d74ce8a110a31dd6e371b40d"}'
- runAfter:
- print-0
name: output-in-range-and-pass-for-loop-2
params:
- name: from
value: '1'
- name: print-0-output_value
value: $(tasks.print-0.results.output-value)
- name: step
value: '2'
- name: to
value: $(tasks.print-0.results.output-value)
taskSpec:
apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
spec:
pipelineSpec:
params:
- name: loop-item-param-1
type: string
- name: print-0-output_value
type: string
tasks:
- name: print-1
params:
- name: print-0-output_value
value: $(params.print-0-output_value)
taskSpec:
steps:
- name: main
command:
- sh
- -c
- |
set -e
echo $0 > $1
- print $(inputs.params.print-0-output_value)
- $(results.output-value.path)
image: alpine:3.6
params:
- name: print-0-output_value
type: string
results:
- name: output-value
type: string
description: /tmp/outputs/output_value/data
metadata:
labels:
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name": "print-1",
"outputs": [{"description": "Represents an output paramter.",
"name": "output_value", "type": "String"}], "version": "print-1@sha256=3b81342bc143f625b58ebdb01e7c83b145880dee807be35c1e16fdb835d46580"}'
- name: artifact-fetcher-0
params:
- name: path
value: $(params.print-0-output_value)
taskRef:
name: artifact_fetcher
apiVersion: fetcher.tekton.dev/v1alpha1
kind: FETCHER
iterateNumeric: loop-item-param-1
metadata:
labels:
pipelines.kubeflow.org/cache_enabled: "true"
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: output-in-range-and-pass
annotations:
tekton.dev/output_artifacts: '{"print-0": [{"key": "artifacts/$PIPELINERUN/print-0/output_value.tgz",
"name": "print-0-output_value", "path": "/tmp/outputs/output_value/data"}],
"print-1": [{"key": "artifacts/$PIPELINERUN/print-1/output_value.tgz", "name":
"print-1-output_value", "path": "/tmp/outputs/output_value/data"}]}'
tekton.dev/input_artifacts: '{"print-1": [{"name": "print-0-output_value", "parent_task":
"print-0"}]}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"print-0": [["output_value", "$(results.output-value.path)"]],
"print-1": [["output_value", "$(results.output-value.path)"]]}'
sidecar.istio.io/inject: "false"
tekton.dev/template: ''
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
pipelines.kubeflow.org/pipeline_spec: '{"name": "output_in_range_and_pass"}'
tekton.dev/resource_templates: '[{"apiVersion": "custom.tekton.dev/v1alpha1",
"kind": "PipelineLoop", "metadata": {"name": "output-in-range-and-pass-for-loop-2"},
"spec": {"iterateNumeric": "loop-item-param-1", "pipelineSpec": {"params": [{"name":
"loop-item-param-1", "type": "string"}, {"name": "print-0-output_value", "type":
"string"}], "tasks": [{"name": "print-1", "params": [{"name": "print-0-output_value",
"value": "$(params.print-0-output_value)"}], "taskSpec": {"metadata": {"annotations":
{"pipelines.kubeflow.org/component_spec_digest": "{\"name\": \"print-1\", \"outputs\":
[{\"description\": \"Represents an output paramter.\", \"name\": \"output_value\",
\"type\": \"String\"}], \"version\": \"print-1@sha256=3b81342bc143f625b58ebdb01e7c83b145880dee807be35c1e16fdb835d46580\"}"},
"labels": {"pipelines.kubeflow.org/cache_enabled": "true"}}, "params": [{"name":
"print-0-output_value", "type": "string"}], "results": [{"description": "/tmp/outputs/output_value/data",
"name": "output-value", "type": "string"}], "steps": [{"command": ["sh", "-c",
"set -e\necho $0 > $1\n", "print $(inputs.params.print-0-output_value)", "$(results.output-value.path)"],
"image": "alpine:3.6", "name": "main"}]}}, {"name": "artifact-fetcher-1", "params":
[{"name": "path", "value": "$(params.print-0-output_value)"}], "taskRef": {"apiVersion":
"fetcher.tekton.dev/v1alpha1", "kind": "FETCHER", "name": "artifact_fetcher"}}]}}}]'
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
spec:
pipelineSpec:
tasks:
- name: print-0
taskSpec:
steps:
- name: main
command:
- sh
- -c
- |
set -e
echo $0 > $1
- Hello!
- $(results.output-value.path)
image: alpine:3.6
results:
- name: output-value
type: string
description: /tmp/outputs/output_value/data
metadata:
labels:
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name": "print-0", "outputs":
[{"description": "Represents an output paramter.", "name": "output_value",
"type": "String"}], "version": "print-0@sha256=5dd3c506ec54281b82008ca8ec5d8142834eae18d74ce8a110a31dd6e371b40d"}'
- runAfter:
- print-0
name: output-in-range-and-pass-for-loop-2
taskRef:
apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
name: output-in-range-and-pass-for-loop-2
params:
- name: from
value: '1'
- name: print-0-output_value
value: $(tasks.print-0.results.output-value)
- name: step
value: '2'
- name: to
value: $(tasks.print-0.results.output-value)

0 comments on commit 5767673

Please sign in to comment.