From 57676731e335ec70005b2a59b5635b8917dafd5b Mon Sep 17 00:00:00 2001 From: Tommy Li Date: Mon, 14 Nov 2022 15:06:27 -0800 Subject: [PATCH] fix(sdk): fix nested loop counter param bug (#1080) * fix nested loop counter param bug * address comments --- .../kfp_tekton/compiler/_tekton_handler.py | 18 ++- sdk/python/tests/compiler/compiler_tests.py | 7 + .../testdata/nested_loop_counter_param.py | 111 +++++++++++++++ .../testdata/nested_loop_counter_param.yaml | 129 ++++++++++++++++++ .../nested_loop_counter_param_noninlined.yaml | 96 +++++++++++++ 5 files changed, 359 insertions(+), 2 deletions(-) create mode 100644 sdk/python/tests/compiler/testdata/nested_loop_counter_param.py create mode 100644 sdk/python/tests/compiler/testdata/nested_loop_counter_param.yaml create mode 100644 sdk/python/tests/compiler/testdata/nested_loop_counter_param_noninlined.yaml diff --git a/sdk/python/kfp_tekton/compiler/_tekton_handler.py b/sdk/python/kfp_tekton/compiler/_tekton_handler.py index c7641dce1f0..a03b2adb255 100644 --- a/sdk/python/kfp_tekton/compiler/_tekton_handler.py +++ b/sdk/python/kfp_tekton/compiler/_tekton_handler.py @@ -257,10 +257,10 @@ def _handle_tekton_custom_task(custom_task: dict, workflow: dict, recursive_task custom_task_cr['spec']['iterationNumberParam']] custom_task_cr['spec']['iterateParam'] = custom_task[custom_task_key]['loop_args'] separator = custom_task[custom_task_key].get('separator') + start_end_step_keys = ['from', 'to', 'step'] if separator is not None: custom_task_cr['spec']['iterateParamStringSeparator'] = separator if custom_task[custom_task_key].get('start') is not None: - start_end_step_keys = ['from', 'to', 'step'] custom_task_cr['spec']['pipelineSpec']['params'] = [value for value in custom_task_cr['spec']['pipelineSpec']['params'] if value['name'] not in start_end_step_keys] @@ -270,10 +270,24 @@ def _handle_tekton_custom_task(custom_task: dict, workflow: dict, recursive_task custom_task_cr['spec']['iterateNumeric'] = custom_task_cr['spec']['iterateParam'] custom_task_cr['spec'].pop('iterateParam') + # check whether or not the nested custom task param reference need to be replaced + custom_task_param_map = {} for custom_task_param in custom_task[custom_task_key]['spec']['params']: if custom_task_param['name'] != custom_task[custom_task_key]['loop_args'] and '$(tasks.' in custom_task_param['value']: + custom_task_param_map.setdefault(custom_task_param['value'], []).append('$(params.%s)' % custom_task_param['name']) + for key, item in custom_task_param_map.items(): + replacement_item = None + if len(item) == 1: + replacement_item = item[0] + if len(item) > 1: + forbidden_keystrings = ['$(params.%s)' % x for x in start_end_step_keys] + for i in item: + if i not in forbidden_keystrings: + replacement_item = i + break + if replacement_item: custom_task_cr = json.loads( - json.dumps(custom_task_cr).replace(custom_task_param['value'], '$(params.%s)' % custom_task_param['name'])) + json.dumps(custom_task_cr).replace(key, replacement_item)) # remove separator from CR params if custom_task[custom_task_key].get('separator') is not None: diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 94444082229..b7c649d61c5 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -190,6 +190,13 @@ def test_recur_cond_workflow(self): from .testdata.tekton_loop_dsl import pipeline self._test_pipeline_workflow(pipeline, 'tekton_loop_dsl.yaml') + def test_nested_loop_counter_param_workflow(self): + """ + Test compiling a loop workflow using tekton nested loop with counter params. + """ + from .testdata.nested_loop_counter_param import output_in_range_and_pass + self._test_pipeline_workflow(output_in_range_and_pass, 'nested_loop_counter_param.yaml') + def test_nested_loop_same_arg_workflow(self): """ Test compiling a nested loop with same argument workflow. diff --git a/sdk/python/tests/compiler/testdata/nested_loop_counter_param.py b/sdk/python/tests/compiler/testdata/nested_loop_counter_param.py new file mode 100644 index 00000000000..1f5f0dfac19 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/nested_loop_counter_param.py @@ -0,0 +1,111 @@ +# Copyright 2022 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import itertools + +from kfp import dsl, components +from kfp.components import load_component_from_text +from kfp_tekton.tekton import TEKTON_CUSTOM_TASK_IMAGES, Loop +from kfp_tekton.compiler import TektonCompiler +import yaml + + +ARTIFACT_FETCHER_IMAGE_NAME = "fetcher/image:latest" +TEKTON_CUSTOM_TASK_IMAGES = TEKTON_CUSTOM_TASK_IMAGES.append(ARTIFACT_FETCHER_IMAGE_NAME) + +_artifact_fetcher_no = 0 + + +def artifact_fetcher(**artifact_paths: str): + '''A containerOp template resolving some artifacts, given their paths.''' + global _artifact_fetcher_no + template_yaml = { + 'name': f'artifact-fetcher-{_artifact_fetcher_no}', + 'description': 'Artifact Fetch', + 'inputs': [ + {'name': name, 'type': 'String'} + for name in artifact_paths.keys() + ], + 'outputs': [ + {'name': name, 'type': 'Artifact'} + for name in artifact_paths.keys() + ], + 'implementation': { + 'container': { + 'image': ARTIFACT_FETCHER_IMAGE_NAME, + 'command': ['sh', '-c'], # irrelevant + 'args': [ + '--apiVersion', 'fetcher.tekton.dev/v1alpha1', + '--kind', 'FETCHER', + '--name', 'artifact_fetcher', + *itertools.chain(*[ + (f'--{name}', {'inputValue': name}) + for name in artifact_paths.keys() + ]) + ] + } + } + } + _artifact_fetcher_no += 1 + template_str = yaml.dump(template_yaml, Dumper=yaml.SafeDumper) + template = components.load_component_from_text(template_str) + op = template(**artifact_paths) + op.add_pod_annotation("valid_container", "false") + return op + + +class Coder: + def empty(self): + return "" + + +TektonCompiler._get_unique_id_code = Coder.empty + + +def PrintOp(name: str, msg: str = None): + if msg is None: + msg = name + print_op = load_component_from_text( + """ + name: %s + inputs: + - {name: input_text, type: String, description: 'Represents an input parameter.'} + outputs: + - {name: output_value, type: String, description: 'Represents an output paramter.'} + implementation: + container: + image: alpine:3.6 + command: + - sh + - -c + - | + set -e + echo $0 > $1 + - {inputValue: input_text} + - {outputPath: output_value} + """ % (name) + ) + return print_op(msg) + + +@dsl.pipeline("output_in_range_and_pass") +def output_in_range_and_pass(): + op0 = PrintOp('print-0', f"Hello!") + with Loop.range(1, op0.output, step=2): + op1 = PrintOp('print-1', f"print {op0.output}") + op2 = artifact_fetcher(path=op0.output) + + +if __name__ == '__main__': + TektonCompiler().compile(output_in_range_and_pass, __file__.replace('.py', '.yaml')) diff --git a/sdk/python/tests/compiler/testdata/nested_loop_counter_param.yaml b/sdk/python/tests/compiler/testdata/nested_loop_counter_param.yaml new file mode 100644 index 00000000000..38557766b48 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/nested_loop_counter_param.yaml @@ -0,0 +1,129 @@ +# Copyright 2021 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: PipelineRun +metadata: + name: output-in-range-and-pass + annotations: + tekton.dev/output_artifacts: '{"print-0": [{"key": "artifacts/$PIPELINERUN/print-0/output_value.tgz", + "name": "print-0-output_value", "path": "/tmp/outputs/output_value/data"}], + "print-1": [{"key": "artifacts/$PIPELINERUN/print-1/output_value.tgz", "name": + "print-1-output_value", "path": "/tmp/outputs/output_value/data"}]}' + tekton.dev/input_artifacts: '{"print-1": [{"name": "print-0-output_value", "parent_task": + "print-0"}]}' + tekton.dev/artifact_bucket: mlpipeline + tekton.dev/artifact_endpoint: minio-service.kubeflow:9000 + tekton.dev/artifact_endpoint_scheme: http:// + tekton.dev/artifact_items: '{"print-0": [["output_value", "$(results.output-value.path)"]], + "print-1": [["output_value", "$(results.output-value.path)"]]}' + sidecar.istio.io/inject: "false" + tekton.dev/template: '' + pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME + pipelines.kubeflow.org/pipeline_spec: '{"name": "output_in_range_and_pass"}' + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' +spec: + pipelineSpec: + tasks: + - name: print-0 + taskSpec: + steps: + - name: main + command: + - sh + - -c + - | + set -e + echo $0 > $1 + - Hello! + - $(results.output-value.path) + image: alpine:3.6 + results: + - name: output-value + type: string + description: /tmp/outputs/output_value/data + metadata: + labels: + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec_digest: '{"name": "print-0", "outputs": + [{"description": "Represents an output paramter.", "name": "output_value", + "type": "String"}], "version": "print-0@sha256=5dd3c506ec54281b82008ca8ec5d8142834eae18d74ce8a110a31dd6e371b40d"}' + - runAfter: + - print-0 + name: output-in-range-and-pass-for-loop-2 + params: + - name: from + value: '1' + - name: print-0-output_value + value: $(tasks.print-0.results.output-value) + - name: step + value: '2' + - name: to + value: $(tasks.print-0.results.output-value) + taskSpec: + apiVersion: custom.tekton.dev/v1alpha1 + kind: PipelineLoop + spec: + pipelineSpec: + params: + - name: loop-item-param-1 + type: string + - name: print-0-output_value + type: string + tasks: + - name: print-1 + params: + - name: print-0-output_value + value: $(params.print-0-output_value) + taskSpec: + steps: + - name: main + command: + - sh + - -c + - | + set -e + echo $0 > $1 + - print $(inputs.params.print-0-output_value) + - $(results.output-value.path) + image: alpine:3.6 + params: + - name: print-0-output_value + type: string + results: + - name: output-value + type: string + description: /tmp/outputs/output_value/data + metadata: + labels: + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec_digest: '{"name": "print-1", + "outputs": [{"description": "Represents an output paramter.", + "name": "output_value", "type": "String"}], "version": "print-1@sha256=3b81342bc143f625b58ebdb01e7c83b145880dee807be35c1e16fdb835d46580"}' + - name: artifact-fetcher-0 + params: + - name: path + value: $(params.print-0-output_value) + taskRef: + name: artifact_fetcher + apiVersion: fetcher.tekton.dev/v1alpha1 + kind: FETCHER + iterateNumeric: loop-item-param-1 + metadata: + labels: + pipelines.kubeflow.org/cache_enabled: "true" diff --git a/sdk/python/tests/compiler/testdata/nested_loop_counter_param_noninlined.yaml b/sdk/python/tests/compiler/testdata/nested_loop_counter_param_noninlined.yaml new file mode 100644 index 00000000000..8585a142170 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/nested_loop_counter_param_noninlined.yaml @@ -0,0 +1,96 @@ +# Copyright 2021 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: PipelineRun +metadata: + name: output-in-range-and-pass + annotations: + tekton.dev/output_artifacts: '{"print-0": [{"key": "artifacts/$PIPELINERUN/print-0/output_value.tgz", + "name": "print-0-output_value", "path": "/tmp/outputs/output_value/data"}], + "print-1": [{"key": "artifacts/$PIPELINERUN/print-1/output_value.tgz", "name": + "print-1-output_value", "path": "/tmp/outputs/output_value/data"}]}' + tekton.dev/input_artifacts: '{"print-1": [{"name": "print-0-output_value", "parent_task": + "print-0"}]}' + tekton.dev/artifact_bucket: mlpipeline + tekton.dev/artifact_endpoint: minio-service.kubeflow:9000 + tekton.dev/artifact_endpoint_scheme: http:// + tekton.dev/artifact_items: '{"print-0": [["output_value", "$(results.output-value.path)"]], + "print-1": [["output_value", "$(results.output-value.path)"]]}' + sidecar.istio.io/inject: "false" + tekton.dev/template: '' + pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME + pipelines.kubeflow.org/pipeline_spec: '{"name": "output_in_range_and_pass"}' + tekton.dev/resource_templates: '[{"apiVersion": "custom.tekton.dev/v1alpha1", + "kind": "PipelineLoop", "metadata": {"name": "output-in-range-and-pass-for-loop-2"}, + "spec": {"iterateNumeric": "loop-item-param-1", "pipelineSpec": {"params": [{"name": + "loop-item-param-1", "type": "string"}, {"name": "print-0-output_value", "type": + "string"}], "tasks": [{"name": "print-1", "params": [{"name": "print-0-output_value", + "value": "$(params.print-0-output_value)"}], "taskSpec": {"metadata": {"annotations": + {"pipelines.kubeflow.org/component_spec_digest": "{\"name\": \"print-1\", \"outputs\": + [{\"description\": \"Represents an output paramter.\", \"name\": \"output_value\", + \"type\": \"String\"}], \"version\": \"print-1@sha256=3b81342bc143f625b58ebdb01e7c83b145880dee807be35c1e16fdb835d46580\"}"}, + "labels": {"pipelines.kubeflow.org/cache_enabled": "true"}}, "params": [{"name": + "print-0-output_value", "type": "string"}], "results": [{"description": "/tmp/outputs/output_value/data", + "name": "output-value", "type": "string"}], "steps": [{"command": ["sh", "-c", + "set -e\necho $0 > $1\n", "print $(inputs.params.print-0-output_value)", "$(results.output-value.path)"], + "image": "alpine:3.6", "name": "main"}]}}, {"name": "artifact-fetcher-1", "params": + [{"name": "path", "value": "$(params.print-0-output_value)"}], "taskRef": {"apiVersion": + "fetcher.tekton.dev/v1alpha1", "kind": "FETCHER", "name": "artifact_fetcher"}}]}}}]' + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' +spec: + pipelineSpec: + tasks: + - name: print-0 + taskSpec: + steps: + - name: main + command: + - sh + - -c + - | + set -e + echo $0 > $1 + - Hello! + - $(results.output-value.path) + image: alpine:3.6 + results: + - name: output-value + type: string + description: /tmp/outputs/output_value/data + metadata: + labels: + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec_digest: '{"name": "print-0", "outputs": + [{"description": "Represents an output paramter.", "name": "output_value", + "type": "String"}], "version": "print-0@sha256=5dd3c506ec54281b82008ca8ec5d8142834eae18d74ce8a110a31dd6e371b40d"}' + - runAfter: + - print-0 + name: output-in-range-and-pass-for-loop-2 + taskRef: + apiVersion: custom.tekton.dev/v1alpha1 + kind: PipelineLoop + name: output-in-range-and-pass-for-loop-2 + params: + - name: from + value: '1' + - name: print-0-output_value + value: $(tasks.print-0.results.output-value) + - name: step + value: '2' + - name: to + value: $(tasks.print-0.results.output-value)