From 26a7d9151766c75d69e1562a91665046f184f337 Mon Sep 17 00:00:00 2001 From: Kevin Bache Date: Tue, 27 Aug 2019 15:10:23 -0700 Subject: [PATCH 01/15] first working commit --- sdk/python/kfp/compiler/compiler.py | 16 ++++++++++++++-- sdk/python/kfp/dsl/_container_op.py | 2 +- sdk/python/kfp/dsl/_for_loop.py | 19 +++++++++++++++---- sdk/python/kfp/dsl/_ops_group.py | 5 ++++- 4 files changed, 34 insertions(+), 8 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 90eca7a0215..8dbd8ff3800 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -1,4 +1,4 @@ -# Copyright 2018-2019 Google LLC + # Copyright 2018-2019 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -506,7 +506,19 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies): "sub_group.loop_args.name: {}.".format(sub_group.loop_args.name)) else: value = '{{inputs.parameters.%s}}' % param_name - task['withItems'] = sub_group.loop_args.to_list_for_task_yaml() + if sub_group.items_is_pipeline_param: + task['withParam'] = sub_group.loop_args.to_list_for_task_yaml() + # TODO: FIXME + if 'dependencies' not in task or not task['dependencies']: + task['dependencies'] = [] + task['dependencies'].append(sub_group.loop_args.items.op_name) + # if pipeline_param.op_name is None: + # return '{{inputs.parameters.%s}}' % pipeline_param.name + # else: + # return '{{tasks.%s.outputs.%s}}' % (pipeline_param.op_name, pipeline_param.name) + + else: + task['withItems'] = sub_group.loop_args.to_list_for_task_yaml() else: value = '{{inputs.parameters.%s}}' % param_name arguments.append({ diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index 87f82872117..549c58160ca 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -962,7 +962,7 @@ def __init__( sidecars: List[Sidecar] = None, container_kwargs: Dict = None, file_outputs: Dict[str, str] = None, - output_artifact_paths : Dict[str, str]=None, + output_artifact_paths: Dict[str, str]=None, artifact_location: V1alpha1ArtifactLocation=None, is_exit_handler=False, pvolumes: Dict[str, V1Volume] = None, diff --git a/sdk/python/kfp/dsl/_for_loop.py b/sdk/python/kfp/dsl/_for_loop.py index 21b08fa45ae..073284f4544 100644 --- a/sdk/python/kfp/dsl/_for_loop.py +++ b/sdk/python/kfp/dsl/_for_loop.py @@ -31,10 +31,10 @@ def __init__(self, items: ItemList, code: Text): """ super().__init__(name=self._make_name(code)) - if not isinstance(items, (list, tuple)): - raise TypeError("Expected list or tuple, got {}.".format(type(items))) + if not isinstance(items, (list, tuple, dsl.PipelineParam)): + raise TypeError("Expected list, tuple, or PipelineParam, got {}.".format(type(items))) - if isinstance(items[0], dict): + if isinstance(items, list) and isinstance(items[0], dict): subvar_names = set(items[0].keys()) for item in items: if not set(item.keys()) == subvar_names: @@ -51,7 +51,18 @@ def __init__(self, items: ItemList, code: Text): self.items = items def to_list_for_task_yaml(self): - return self.items + if isinstance(self.items, list): + return self.items + else: + # pipeline param + pipeline_param = self.items + if pipeline_param.op_name is None: + return '{{inputs.parameters.%s}}' % pipeline_param.name + else: + param_name = '%s-%s' % (pipeline_param.op_name, pipeline_param.name) + return '{{tasks.%s.outputs.parameters.%s}}' % (pipeline_param.op_name, param_name) + # return '{{tasks.%s.outputs.result}}' % (pipeline_param.op_name, ) + @classmethod def _make_name(cls, code: Text): diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index 6ce7143bc64..7f1aa61cc86 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -14,7 +14,7 @@ from typing import Union import uuid -from kfp.dsl import _for_loop +from kfp.dsl import _for_loop, _pipeline_param from . import _container_op from . import _pipeline @@ -169,6 +169,9 @@ def _get_unique_id_code(): return uuid.uuid4().hex[:_for_loop.LoopArguments.NUM_CODE_CHARS] def __init__(self, loop_args: _for_loop.ItemList): + + self.items_is_pipeline_param = isinstance(loop_args, _pipeline_param.PipelineParam) + # random code to id this loop code = self._get_unique_id_code() group_name = 'for-loop-{}'.format(code) From 15b5f30327962c0ded68db456161cfd9c4c84304 Mon Sep 17 00:00:00 2001 From: Kevin Bache Date: Wed, 28 Aug 2019 16:06:58 -0700 Subject: [PATCH 02/15] incrememtal commit --- sdk/python/kfp/compiler/compiler.py | 54 +++++---- sdk/python/kfp/dsl/_for_loop.py | 5 +- sdk/python/kfp/dsl/_ops_group.py | 6 +- .../compiler/compiler_withparams_test.py | 112 ++++++++++++++++++ 4 files changed, 152 insertions(+), 25 deletions(-) create mode 100644 sdk/python/tests/compiler/compiler_withparams_test.py diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 8dbd8ff3800..ecb16c2c263 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -334,22 +334,22 @@ def _get_dependencies(self, pipeline, root_group, op_groups, opsgroups_groups, o dependencies = defaultdict(set) for op in pipeline.ops.values(): upstream_op_names = set() + print(op.name) for param in op.inputs + list(condition_params[op.name]): if param.op_name: upstream_op_names.add(param.op_name) upstream_op_names |= set(op.dependent_names) - for op_name in upstream_op_names: + for upstream_op_name in upstream_op_names: # the dependent op could be either a BaseOp or an opsgroup - if op_name in pipeline.ops: - upstream_op = pipeline.ops[op_name] - elif op_name in opsgroups: - upstream_op = opsgroups[op_name] + if upstream_op_name in pipeline.ops: + upstream_op = pipeline.ops[upstream_op_name] + elif upstream_op_name in opsgroups: + upstream_op = opsgroups[upstream_op_name] else: - raise ValueError('compiler cannot find the ' + op_name) + raise ValueError('compiler cannot find the ' + upstream_op_name) - upstream_groups, downstream_groups = \ - self._get_uncommon_ancestors(op_groups, opsgroups_groups, upstream_op, op) + upstream_groups, downstream_groups = self._get_uncommon_ancestors(op_groups, opsgroups_groups, upstream_op, op) dependencies[downstream_groups[0]].add(upstream_groups[0]) # Generate dependencies based on the recursive opsgroups @@ -506,25 +506,37 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies): "sub_group.loop_args.name: {}.".format(sub_group.loop_args.name)) else: value = '{{inputs.parameters.%s}}' % param_name - if sub_group.items_is_pipeline_param: - task['withParam'] = sub_group.loop_args.to_list_for_task_yaml() - # TODO: FIXME - if 'dependencies' not in task or not task['dependencies']: - task['dependencies'] = [] - task['dependencies'].append(sub_group.loop_args.items.op_name) - # if pipeline_param.op_name is None: - # return '{{inputs.parameters.%s}}' % pipeline_param.name - # else: - # return '{{tasks.%s.outputs.%s}}' % (pipeline_param.op_name, pipeline_param.name) + if sub_group.items_is_pipeline_param: + # these loop args are a 'withParam' rather than 'withItem'. + # i.e., rather than a static list, they are either the output of another task or were input + # as global pipeline parameters + param = sub_group.loop_args + task['withParam'] = param.to_list_for_task_yaml() + # shouldn't be loop-item-param, the manufactured pipeline param on the ParallelFor, + # should be loopidy-doop, the global pipeline param that was passed in to the for loop + + if sub_group.original_pipeline_param.op_name is None: + # the input loop_args to the ParallelFor wasn't produced by another op + # (i.e.: it isn't the output of another task), rather it was a global input pipeline param + param_name = sub_group.original_pipeline_param.name + # value = '{{inputs.parameters.%s}}' % param_name + # value = None + if param.items.op_name is not None: + # these loop args are the output of another task + if 'dependencies' not in task or task['dependencies'] is None: + task['dependencies'] = [] + task['dependencies'].append(param.items.op_name) else: task['withItems'] = sub_group.loop_args.to_list_for_task_yaml() else: value = '{{inputs.parameters.%s}}' % param_name - arguments.append({ + d = { 'name': param_name, - 'value': value, - }) + } + if value is not None: + d['value'] = value + arguments.append(d) arguments.sort(key=lambda x: x['name']) task['arguments'] = {'parameters': arguments} tasks.append(task) diff --git a/sdk/python/kfp/dsl/_for_loop.py b/sdk/python/kfp/dsl/_for_loop.py index 073284f4544..68daaec43e7 100644 --- a/sdk/python/kfp/dsl/_for_loop.py +++ b/sdk/python/kfp/dsl/_for_loop.py @@ -57,12 +57,11 @@ def to_list_for_task_yaml(self): # pipeline param pipeline_param = self.items if pipeline_param.op_name is None: - return '{{inputs.parameters.%s}}' % pipeline_param.name + # return '{{inputs.parameters.%s}}' % pipeline_param.name + return '{{workflow.parameters.%s}}' % pipeline_param.name else: param_name = '%s-%s' % (pipeline_param.op_name, pipeline_param.name) return '{{tasks.%s.outputs.parameters.%s}}' % (pipeline_param.op_name, param_name) - # return '{{tasks.%s.outputs.result}}' % (pipeline_param.op_name, ) - @classmethod def _make_name(cls, code: Text): diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index 7f1aa61cc86..dd07d2fe969 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -168,9 +168,13 @@ class ParallelFor(OpsGroup): def _get_unique_id_code(): return uuid.uuid4().hex[:_for_loop.LoopArguments.NUM_CODE_CHARS] - def __init__(self, loop_args: _for_loop.ItemList): + def __init__(self, loop_args: Union[_for_loop.ItemList, _pipeline_param.PipelineParam]): self.items_is_pipeline_param = isinstance(loop_args, _pipeline_param.PipelineParam) + if self.items_is_pipeline_param: + self.original_pipeline_param = loop_args + else: + self.original_pipeline_param = None # random code to id this loop code = self._get_unique_id_code() diff --git a/sdk/python/tests/compiler/compiler_withparams_test.py b/sdk/python/tests/compiler/compiler_withparams_test.py new file mode 100644 index 00000000000..c8160d69959 --- /dev/null +++ b/sdk/python/tests/compiler/compiler_withparams_test.py @@ -0,0 +1,112 @@ +from typing import Iterable, Text + +from kfp.compiler import compiler +from kfp import dsl +import kfp + +# reproducible UUIDs: https://stackoverflow.com/a/56757552/9357327 +import uuid +import random +# ------------------------------------------- +# Remove this block to generate different +# UUIDs everytime you run this code. +# This block should be right below the uuid +# import. +rd = random.Random() +rd.seed(0) +uuid.uuid4 = lambda: uuid.UUID(int=rd.getrandbits(128)) +# ------------------------------------------- + +if __name__ == '__main__': + + do_output = True + + if do_output: + @dsl.pipeline(name='my-pipeline', description='A pipeline with multiple pipeline params.') + def pipeline(my_pipe_param=10): + op0 = dsl.ContainerOp( + name="my-out-cop0", + image='python:alpine3.6', + command=["sh", "-c"], + arguments=['python -c "import json; import sys; json.dump([i for i in range(20, 31)], open(\'/tmp/out.json\', \'w\'))"'], + file_outputs={'out': '/tmp/out.json'}, + ) + + # loop_args = [{'a': 1, 'b': 2}, {'a': 10, 'b': 20}] + with dsl.ParallelFor(op0.output) as item: + op1 = dsl.ContainerOp( + name="my-in-cop1", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo op1 %s" % item], + ) + + op_out = dsl.ContainerOp( + name="my-out-cop2", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo %s" % op0.output], + ) + + yaml_text = compiler.Compiler().compile(pipeline, None) + print(yaml_text) + + import kfp + import time + client = kfp.Client(host='127.0.0.1:8080/pipeline') + print(client.list_experiments()) + + pkg_path = '/tmp/witest_pkg.tar.gz' + compiler.Compiler().compile(pipeline, package_path=pkg_path) + exp = client.create_experiment('withitems_exp') + client.run_pipeline( + experiment_id=exp.id, + job_name=f'withitems_job_{time.time()}', + pipeline_package_path=pkg_path, + params={'my-pipe-param': 11}, + ) + + +# if __name__ == '__main__': +# @dsl.pipeline(name='my-pipeline', description='A pipeline with multiple pipeline params.') +# def pipeline(loopidy_doop=[3, 5, 7, 9]): +# op0 = dsl.ContainerOp( +# name="my-out-cop0", +# image='python:alpine3.6', +# command=["sh", "-c"], +# arguments=['python -c "import json; import sys; json.dump([i for i in range(20, 31)], open(\'/tmp/out.json\', \'w\'))"'], +# file_outputs={'out': '/tmp/out.json'}, +# ) +# +# with dsl.ParallelFor(loopidy_doop) as item: +# op1 = dsl.ContainerOp( +# name="my-in-cop1", +# image="library/bash:4.4.23", +# command=["sh", "-c"], +# arguments=["echo op1 %s" % item], +# ).after(op0) +# +# op_out = dsl.ContainerOp( +# name="my-out-cop2", +# image="library/bash:4.4.23", +# command=["sh", "-c"], +# arguments=["echo %s" % op0.output], +# ) +# +# yaml_text = compiler.Compiler().compile(pipeline, None) +# print(yaml_text) +# +# import kfp +# import time +# client = kfp.Client(host='127.0.0.1:8080/pipeline') +# print(client.list_experiments()) +# +# pkg_path = '/tmp/witest_pkg.tar.gz' +# compiler.Compiler().compile(pipeline, package_path=pkg_path) +# exp = client.create_experiment('withitems_exp') +# client.run_pipeline( +# experiment_id=exp.id, +# job_name=f'withitems_job_{time.time()}', +# pipeline_package_path=pkg_path, +# ) +# From 820889e2f542df863dc2d677b0660e0cf0401368 Mon Sep 17 00:00:00 2001 From: Kevin Bache Date: Wed, 28 Aug 2019 18:40:04 -0700 Subject: [PATCH 03/15] in the middle of converting loop args constructor to accept pipeline param --- sdk/python/kfp/compiler/compiler.py | 14 ++------- sdk/python/kfp/dsl/_for_loop.py | 29 ++++++++++--------- sdk/python/kfp/dsl/_ops_group.py | 12 +++----- .../compiler/compiler_withparams_test.py | 27 ++++++++++++++++- 4 files changed, 49 insertions(+), 33 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index ecb16c2c263..284d9b50a76 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -511,22 +511,14 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies): # these loop args are a 'withParam' rather than 'withItem'. # i.e., rather than a static list, they are either the output of another task or were input # as global pipeline parameters - param = sub_group.loop_args - task['withParam'] = param.to_list_for_task_yaml() + task['withParam'] = sub_group.loop_args.to_list_for_task_yaml() # shouldn't be loop-item-param, the manufactured pipeline param on the ParallelFor, # should be loopidy-doop, the global pipeline param that was passed in to the for loop - - if sub_group.original_pipeline_param.op_name is None: - # the input loop_args to the ParallelFor wasn't produced by another op - # (i.e.: it isn't the output of another task), rather it was a global input pipeline param - param_name = sub_group.original_pipeline_param.name - # value = '{{inputs.parameters.%s}}' % param_name - # value = None - if param.items.op_name is not None: + if sub_group.loop_args.op_name is not None: # these loop args are the output of another task if 'dependencies' not in task or task['dependencies'] is None: task['dependencies'] = [] - task['dependencies'].append(param.items.op_name) + task['dependencies'].append(sub_group.loop_args.op_name) else: task['withItems'] = sub_group.loop_args.to_list_for_task_yaml() else: diff --git a/sdk/python/kfp/dsl/_for_loop.py b/sdk/python/kfp/dsl/_for_loop.py index 68daaec43e7..4bb457e0d76 100644 --- a/sdk/python/kfp/dsl/_for_loop.py +++ b/sdk/python/kfp/dsl/_for_loop.py @@ -1,5 +1,5 @@ import re -from typing import List, Union, Dict, Text, Any, Tuple +from typing import List, Union, Dict, Text, Any, Tuple, Optional from kfp import dsl @@ -19,7 +19,7 @@ class LoopArguments(dsl.PipelineParam): def _subvar_name_is_legal(cls, proposed_variable_name: Text): return re.match(cls.LEGAL_SUBVAR_NAME_REGEX, proposed_variable_name) is not None - def __init__(self, items: ItemList, code: Text): + def __init__(self, items: Union[ItemList, dsl.PipelineParam], code: Text, name_override: Optional[Text]=None, op_name: Optional[Text]=None): """_LoopArguments represent the set of items to loop over in a ParallelFor loop. This class shoudn't be instantiated by the user but rather is created by _ops_group.ParallelFor. @@ -29,7 +29,10 @@ def __init__(self, items: ItemList, code: Text): code: A unique code used to identify these loop arguments. Should match the code for the ParallelFor ops_group which created these _LoopArguments. This prevents parameter name collissions. """ - super().__init__(name=self._make_name(code)) + if name_override is None: + super().__init__(name=self._make_name(code)) + else: + super().__init__(name=name_override, op_name=op_name) if not isinstance(items, (list, tuple, dsl.PipelineParam)): raise TypeError("Expected list, tuple, or PipelineParam, got {}.".format(type(items))) @@ -41,21 +44,21 @@ def __init__(self, items: ItemList, code: Text): raise ValueError("If you input a list of dicts then all dicts should have the same keys. " "Got: {}.".format(items)) - # then this block creates loop_args.variable_a and loop_args.variable_b - for subvar_name in subvar_names: - if not self._subvar_name_is_legal(subvar_name): - raise ValueError("Tried to create subvariable named {} but that's not a legal Python variable " - "name.".format(subvar_name)) - setattr(self, subvar_name, LoopArgumentVariable(self.name, subvar_name)) + self.items_or_pipeline_param = items + self.referenced_subvar_names = [] - self.items = items + def __getattr__(self, item): + # this is being overridden so that we can access subvariables of the LoopArguments (i.e.: item.a) without + # knowing the subvariable names ahead of time + self.referenced_subvar_names.append(item) + return LoopArgumentVariable(self.name, item) def to_list_for_task_yaml(self): - if isinstance(self.items, list): - return self.items + if isinstance(self.items_or_pipeline_param, list): + return self.items_or_pipeline_param else: # pipeline param - pipeline_param = self.items + pipeline_param = self.items_or_pipeline_param if pipeline_param.op_name is None: # return '{{inputs.parameters.%s}}' % pipeline_param.name return '{{workflow.parameters.%s}}' % pipeline_param.name diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index dd07d2fe969..108e8482bee 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import json from typing import Union import uuid @@ -169,19 +170,14 @@ def _get_unique_id_code(): return uuid.uuid4().hex[:_for_loop.LoopArguments.NUM_CODE_CHARS] def __init__(self, loop_args: Union[_for_loop.ItemList, _pipeline_param.PipelineParam]): - - self.items_is_pipeline_param = isinstance(loop_args, _pipeline_param.PipelineParam) - if self.items_is_pipeline_param: - self.original_pipeline_param = loop_args - else: - self.original_pipeline_param = None - # random code to id this loop code = self._get_unique_id_code() group_name = 'for-loop-{}'.format(code) super().__init__(self.TYPE_NAME, name=group_name) - if not isinstance(loop_args, _for_loop.LoopArguments): + self.items_is_pipeline_param = isinstance(loop_args, _pipeline_param.PipelineParam) + if not self.items_is_pipeline_param and not isinstance(loop_args, _for_loop.LoopArguments): + # we were passed a raw list, wrap it in loop args loop_args = _for_loop.LoopArguments(loop_args, code) self.loop_args = loop_args diff --git a/sdk/python/tests/compiler/compiler_withparams_test.py b/sdk/python/tests/compiler/compiler_withparams_test.py index c8160d69959..7b9da2bfbb5 100644 --- a/sdk/python/tests/compiler/compiler_withparams_test.py +++ b/sdk/python/tests/compiler/compiler_withparams_test.py @@ -19,7 +19,7 @@ if __name__ == '__main__': - do_output = True + do_output = False if do_output: @dsl.pipeline(name='my-pipeline', description='A pipeline with multiple pipeline params.') @@ -47,6 +47,31 @@ def pipeline(my_pipe_param=10): command=["sh", "-c"], arguments=["echo %s" % op0.output], ) + else: + @dsl.pipeline(name='my-pipeline', description='A pipeline with multiple pipeline params.') + def pipeline(loopidy_doop=[3, 5, 7, 9]): + op0 = dsl.ContainerOp( + name="my-out-cop0", + image='python:alpine3.6', + command=["sh", "-c"], + arguments=['python -c "import json; import sys; json.dump([i for i in range(20, 31)], open(\'/tmp/out.json\', \'w\'))"'], + file_outputs={'out': '/tmp/out.json'}, + ) + + with dsl.ParallelFor(loopidy_doop) as item: + op1 = dsl.ContainerOp( + name="my-in-cop1", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo op1 %s" % item.a], + ).after(op0) + + op_out = dsl.ContainerOp( + name="my-out-cop2", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo %s" % op0.output], + ) yaml_text = compiler.Compiler().compile(pipeline, None) print(yaml_text) From 1001779ddd8abfde9f6bcf337612a7fb25ccd2e3 Mon Sep 17 00:00:00 2001 From: Kevin Bache Date: Tue, 3 Sep 2019 14:50:42 -0700 Subject: [PATCH 04/15] both cases working --- sdk/python/kfp/compiler/compiler.py | 15 ++-- sdk/python/kfp/dsl/_ops_group.py | 9 +- .../compiler/compiler_withparams_test.py | 86 ++++++++----------- 3 files changed, 48 insertions(+), 62 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index ecb16c2c263..4ace3b4056f 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -515,13 +515,14 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies): task['withParam'] = param.to_list_for_task_yaml() # shouldn't be loop-item-param, the manufactured pipeline param on the ParallelFor, # should be loopidy-doop, the global pipeline param that was passed in to the for loop - - if sub_group.original_pipeline_param.op_name is None: - # the input loop_args to the ParallelFor wasn't produced by another op - # (i.e.: it isn't the output of another task), rather it was a global input pipeline param - param_name = sub_group.original_pipeline_param.name - # value = '{{inputs.parameters.%s}}' % param_name - # value = None + # + # if sub_group.original_pipeline_param.op_name is None: + # # the input loop_args to the ParallelFor wasn't produced by another op + # # (i.e.: it isn't the output of another task), rather it was a global input pipeline param + # param_name = + # # param_name = sub_group.original_pipeline_param.name + # # value = '{{inputs.parameters.%s}}' % param_name + # # value = None if param.items.op_name is not None: # these loop args are the output of another task if 'dependencies' not in task or task['dependencies'] is None: diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index dd07d2fe969..eb2d8137a19 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -169,12 +169,11 @@ def _get_unique_id_code(): return uuid.uuid4().hex[:_for_loop.LoopArguments.NUM_CODE_CHARS] def __init__(self, loop_args: Union[_for_loop.ItemList, _pipeline_param.PipelineParam]): - self.items_is_pipeline_param = isinstance(loop_args, _pipeline_param.PipelineParam) - if self.items_is_pipeline_param: - self.original_pipeline_param = loop_args - else: - self.original_pipeline_param = None + # if self.items_is_pipeline_param: + # self.original_pipeline_param = loop_args + # else: + # self.original_pipeline_param = None # random code to id this loop code = self._get_unique_id_code() diff --git a/sdk/python/tests/compiler/compiler_withparams_test.py b/sdk/python/tests/compiler/compiler_withparams_test.py index c8160d69959..ce391e02573 100644 --- a/sdk/python/tests/compiler/compiler_withparams_test.py +++ b/sdk/python/tests/compiler/compiler_withparams_test.py @@ -1,3 +1,4 @@ +import time from typing import Iterable, Text from kfp.compiler import compiler @@ -18,12 +19,11 @@ # ------------------------------------------- if __name__ == '__main__': - do_output = True if do_output: @dsl.pipeline(name='my-pipeline', description='A pipeline with multiple pipeline params.') - def pipeline(my_pipe_param=10): + def pipeline(): op0 = dsl.ContainerOp( name="my-out-cop0", image='python:alpine3.6', @@ -48,6 +48,37 @@ def pipeline(my_pipe_param=10): arguments=["echo %s" % op0.output], ) + job_name = f'withparams_passed_param_{time.time()}' + params = {} + else: + @dsl.pipeline(name='my-pipeline', description='A pipeline with multiple pipeline params.') + def pipeline(loopidy_doop=[3, 5, 7, 9]): + op0 = dsl.ContainerOp( + name="my-out-cop0", + image='python:alpine3.6', + command=["sh", "-c"], + arguments=['python -c "import json; import sys; json.dump([i for i in range(20, 31)], open(\'/tmp/out.json\', \'w\'))"'], + file_outputs={'out': '/tmp/out.json'}, + ) + + with dsl.ParallelFor(loopidy_doop) as item: + op1 = dsl.ContainerOp( + name="my-in-cop1", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo op1 %s" % item], + ).after(op0) + + op_out = dsl.ContainerOp( + name="my-out-cop2", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo %s" % op0.output], + ).after(op1) + + job_name = f'withparams_global_param_{time.time()}' + params = {} + yaml_text = compiler.Compiler().compile(pipeline, None) print(yaml_text) @@ -58,55 +89,10 @@ def pipeline(my_pipe_param=10): pkg_path = '/tmp/witest_pkg.tar.gz' compiler.Compiler().compile(pipeline, package_path=pkg_path) - exp = client.create_experiment('withitems_exp') + exp = client.create_experiment('withparams_exp') client.run_pipeline( experiment_id=exp.id, - job_name=f'withitems_job_{time.time()}', + job_name=job_name, pipeline_package_path=pkg_path, - params={'my-pipe-param': 11}, + params=params, ) - - -# if __name__ == '__main__': -# @dsl.pipeline(name='my-pipeline', description='A pipeline with multiple pipeline params.') -# def pipeline(loopidy_doop=[3, 5, 7, 9]): -# op0 = dsl.ContainerOp( -# name="my-out-cop0", -# image='python:alpine3.6', -# command=["sh", "-c"], -# arguments=['python -c "import json; import sys; json.dump([i for i in range(20, 31)], open(\'/tmp/out.json\', \'w\'))"'], -# file_outputs={'out': '/tmp/out.json'}, -# ) -# -# with dsl.ParallelFor(loopidy_doop) as item: -# op1 = dsl.ContainerOp( -# name="my-in-cop1", -# image="library/bash:4.4.23", -# command=["sh", "-c"], -# arguments=["echo op1 %s" % item], -# ).after(op0) -# -# op_out = dsl.ContainerOp( -# name="my-out-cop2", -# image="library/bash:4.4.23", -# command=["sh", "-c"], -# arguments=["echo %s" % op0.output], -# ) -# -# yaml_text = compiler.Compiler().compile(pipeline, None) -# print(yaml_text) -# -# import kfp -# import time -# client = kfp.Client(host='127.0.0.1:8080/pipeline') -# print(client.list_experiments()) -# -# pkg_path = '/tmp/witest_pkg.tar.gz' -# compiler.Compiler().compile(pipeline, package_path=pkg_path) -# exp = client.create_experiment('withitems_exp') -# client.run_pipeline( -# experiment_id=exp.id, -# job_name=f'withitems_job_{time.time()}', -# pipeline_package_path=pkg_path, -# ) -# From f5e466474fdb8d0bb81c2de4568a6eb5e9d49045 Mon Sep 17 00:00:00 2001 From: Kevin Bache Date: Tue, 3 Sep 2019 17:56:21 -0700 Subject: [PATCH 05/15] output works, passed doesn't --- sdk/python/kfp/compiler/compiler.py | 45 +++++++++---------- sdk/python/kfp/dsl/_for_loop.py | 10 +---- sdk/python/kfp/dsl/_ops_group.py | 6 +-- .../compiler/compiler_withparams_test.py | 25 ----------- 4 files changed, 25 insertions(+), 61 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index f934bca211e..32ce70022d6 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -498,44 +498,43 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies): if sub_group.loop_args.name in param_name: if _for_loop.LoopArgumentVariable.name_is_loop_arguments_variable(param_name): subvar_name = _for_loop.LoopArgumentVariable.get_subvar_name(param_name) - value = '{{item.%s}}' % subvar_name - elif _for_loop.LoopArguments.name_is_loop_arguments(param_name): - value = '{{item}}' + param_value = '{{item.%s}}' % subvar_name + elif _for_loop.LoopArguments.name_is_loop_arguments(param_name) or sub_group.items_is_pipeline_param: + param_value = '{{item}}' else: - raise ValueError("Failed to match loop args with param. param_name: {}, ".format(param_name) + - "sub_group.loop_args.name: {}.".format(sub_group.loop_args.name)) + raise ValueError("Failed to match loop args with parameter. param_name: {}, ".format(param_name)) else: - value = '{{inputs.parameters.%s}}' % param_name + param_value = '{{inputs.parameters.%s}}' % param_name + # withParam vs withItems if sub_group.items_is_pipeline_param: - # these loop args are a 'withParam' rather than 'withItem'. + # these loop args are a 'withParam' rather than 'withItems'. # i.e., rather than a static list, they are either the output of another task or were input # as global pipeline parameters - task['withParam'] = sub_group.loop_args.to_list_for_task_yaml() - # shouldn't be loop-item-param, the manufactured pipeline param on the ParallelFor, - # should be loopidy-doop, the global pipeline param that was passed in to the for loop - # - # if sub_group.original_pipeline_param.op_name is None: - # # the input loop_args to the ParallelFor wasn't produced by another op - # # (i.e.: it isn't the output of another task), rather it was a global input pipeline param - # param_name = - # # param_name = sub_group.original_pipeline_param.name - # # value = '{{inputs.parameters.%s}}' % param_name - # # value = None - if param.items.op_name is not None: + + pipeline_param = sub_group.loop_args + if pipeline_param.op_name is None: + withparam_value = '{{workflow.parameters.%s}}' % pipeline_param.name + else: + param_name = '%s-%s' % (pipeline_param.op_name, pipeline_param.name) + withparam_value = '{{tasks.%s.outputs.parameters.%s}}' % (pipeline_param.op_name, param_name) + # these loop args are the output of another task if 'dependencies' not in task or task['dependencies'] is None: task['dependencies'] = [] - task['dependencies'].append(sub_group.loop_args.op_name) + task['dependencies'].append(pipeline_param.op_name) + + task['withParam'] = withparam_value else: task['withItems'] = sub_group.loop_args.to_list_for_task_yaml() else: - value = '{{inputs.parameters.%s}}' % param_name + # subgroup is not a ParallelFor + param_value = '{{inputs.parameters.%s}}' % param_name d = { 'name': param_name, } - if value is not None: - d['value'] = value + if param_value is not None: + d['value'] = param_value arguments.append(d) arguments.sort(key=lambda x: x['name']) task['arguments'] = {'parameters': arguments} diff --git a/sdk/python/kfp/dsl/_for_loop.py b/sdk/python/kfp/dsl/_for_loop.py index 4bb457e0d76..6f5e60907fa 100644 --- a/sdk/python/kfp/dsl/_for_loop.py +++ b/sdk/python/kfp/dsl/_for_loop.py @@ -57,14 +57,8 @@ def to_list_for_task_yaml(self): if isinstance(self.items_or_pipeline_param, list): return self.items_or_pipeline_param else: - # pipeline param - pipeline_param = self.items_or_pipeline_param - if pipeline_param.op_name is None: - # return '{{inputs.parameters.%s}}' % pipeline_param.name - return '{{workflow.parameters.%s}}' % pipeline_param.name - else: - param_name = '%s-%s' % (pipeline_param.op_name, pipeline_param.name) - return '{{tasks.%s.outputs.parameters.%s}}' % (pipeline_param.op_name, param_name) + raise ValueError("You should only call this method on loop args which have list items, " + "not pipeline param items.") @classmethod def _make_name(cls, code: Text): diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index 0618d39dcda..c58dce5760a 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -171,12 +171,8 @@ def _get_unique_id_code(): def __init__(self, loop_args: Union[_for_loop.ItemList, _pipeline_param.PipelineParam]): self.items_is_pipeline_param = isinstance(loop_args, _pipeline_param.PipelineParam) - # if self.items_is_pipeline_param: - # self.original_pipeline_param = loop_args - # else: - # self.original_pipeline_param = None - # random code to id this loop + # use a random code to uniquely identify this loop code = self._get_unique_id_code() group_name = 'for-loop-{}'.format(code) super().__init__(self.TYPE_NAME, name=group_name) diff --git a/sdk/python/tests/compiler/compiler_withparams_test.py b/sdk/python/tests/compiler/compiler_withparams_test.py index 753a0f7183c..ce391e02573 100644 --- a/sdk/python/tests/compiler/compiler_withparams_test.py +++ b/sdk/python/tests/compiler/compiler_withparams_test.py @@ -47,31 +47,6 @@ def pipeline(): command=["sh", "-c"], arguments=["echo %s" % op0.output], ) - else: - @dsl.pipeline(name='my-pipeline', description='A pipeline with multiple pipeline params.') - def pipeline(loopidy_doop=[3, 5, 7, 9]): - op0 = dsl.ContainerOp( - name="my-out-cop0", - image='python:alpine3.6', - command=["sh", "-c"], - arguments=['python -c "import json; import sys; json.dump([i for i in range(20, 31)], open(\'/tmp/out.json\', \'w\'))"'], - file_outputs={'out': '/tmp/out.json'}, - ) - - with dsl.ParallelFor(loopidy_doop) as item: - op1 = dsl.ContainerOp( - name="my-in-cop1", - image="library/bash:4.4.23", - command=["sh", "-c"], - arguments=["echo op1 %s" % item.a], - ).after(op0) - - op_out = dsl.ContainerOp( - name="my-out-cop2", - image="library/bash:4.4.23", - command=["sh", "-c"], - arguments=["echo %s" % op0.output], - ) job_name = f'withparams_passed_param_{time.time()}' params = {} From 7e3b4c7877db911e570b501ef34de965b62005b6 Mon Sep 17 00:00:00 2001 From: Kevin Bache Date: Wed, 4 Sep 2019 17:24:18 -0700 Subject: [PATCH 06/15] about to redo compiler section --- sdk/python/kfp/compiler/compiler.py | 7 +++++++ sdk/python/kfp/dsl/_ops_group.py | 4 ++-- .../compiler/compiler_withparams_test.py | 19 +++++++++---------- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 32ce70022d6..94e551586da 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -433,6 +433,7 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies): # Generate tasks section. tasks = [] for sub_group in group.groups + group.ops: + print(f"sub_group.name = {sub_group.name}") is_recursive_subgroup = (isinstance(sub_group, OpsGroup) and sub_group.recursive_ref) # Special handling for recursive subgroup: use the existing opsgroup name if is_recursive_subgroup: @@ -463,6 +464,7 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies): arguments = [] for param_name, dependent_name in inputs[sub_group.name]: if dependent_name: + print(f'in dependent_name for {sub_group.name}') # The value comes from an upstream sibling. # Special handling for recursive subgroup: argument name comes from the existing opsgroup if is_recursive_subgroup: @@ -494,6 +496,7 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies): 'value': '{{inputs.parameters.%s}}' % param_name }) else: + print(f'about to check parallefor for {sub_group.name}') if isinstance(sub_group, dsl.ParallelFor): if sub_group.loop_args.name in param_name: if _for_loop.LoopArgumentVariable.name_is_loop_arguments_variable(param_name): @@ -506,6 +509,10 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies): else: param_value = '{{inputs.parameters.%s}}' % param_name + ################### + # OLD SPOT + ################### + # withParam vs withItems if sub_group.items_is_pipeline_param: # these loop args are a 'withParam' rather than 'withItems'. diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index c58dce5760a..588d7b34830 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -10,7 +10,6 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limitations under the License. import json from typing import Union import uuid @@ -19,7 +18,7 @@ from . import _container_op from . import _pipeline -from ._pipeline_param import ConditionOperator + class OpsGroup(object): """Represents a logical group of ops and group of OpsGroups. @@ -94,6 +93,7 @@ def after(self, dependency): self.dependencies.append(dependency) return self + class ExitHandler(OpsGroup): """Represents an exit handler that is invoked upon exiting a group of ops. diff --git a/sdk/python/tests/compiler/compiler_withparams_test.py b/sdk/python/tests/compiler/compiler_withparams_test.py index ce391e02573..693a6e2c5eb 100644 --- a/sdk/python/tests/compiler/compiler_withparams_test.py +++ b/sdk/python/tests/compiler/compiler_withparams_test.py @@ -3,7 +3,6 @@ from kfp.compiler import compiler from kfp import dsl -import kfp # reproducible UUIDs: https://stackoverflow.com/a/56757552/9357327 import uuid @@ -22,7 +21,7 @@ do_output = True if do_output: - @dsl.pipeline(name='my-pipeline', description='A pipeline with multiple pipeline params.') + @dsl.pipeline(name='my-pipeline') def pipeline(): op0 = dsl.ContainerOp( name="my-out-cop0", @@ -38,20 +37,20 @@ def pipeline(): name="my-in-cop1", image="library/bash:4.4.23", command=["sh", "-c"], - arguments=["echo op1 %s" % item], + arguments=["echo do output op1 item: %s" % item], ) op_out = dsl.ContainerOp( name="my-out-cop2", image="library/bash:4.4.23", command=["sh", "-c"], - arguments=["echo %s" % op0.output], + arguments=["echo do output op2, outp: %s" % op0.output], ) - job_name = f'withparams_passed_param_{time.time()}' + job_name = f'do-output=TRUE-passed-{time.time()}' params = {} else: - @dsl.pipeline(name='my-pipeline', description='A pipeline with multiple pipeline params.') + @dsl.pipeline(name='my-pipeline') def pipeline(loopidy_doop=[3, 5, 7, 9]): op0 = dsl.ContainerOp( name="my-out-cop0", @@ -66,17 +65,17 @@ def pipeline(loopidy_doop=[3, 5, 7, 9]): name="my-in-cop1", image="library/bash:4.4.23", command=["sh", "-c"], - arguments=["echo op1 %s" % item], + arguments=["echo no output global op1, item: %s" % item], ).after(op0) op_out = dsl.ContainerOp( name="my-out-cop2", image="library/bash:4.4.23", command=["sh", "-c"], - arguments=["echo %s" % op0.output], - ).after(op1) + arguments=["echo no output global op2, outp: %s" % op0.output], + ) - job_name = f'withparams_global_param_{time.time()}' + job_name = f'do-output=FALSE-global-{time.time()}' params = {} yaml_text = compiler.Compiler().compile(pipeline, None) From a17840316b668c6f96546c3be60d96f7165bf70f Mon Sep 17 00:00:00 2001 From: Kevin Bache Date: Wed, 4 Sep 2019 18:01:18 -0700 Subject: [PATCH 07/15] rewrite draft done --- sdk/python/kfp/compiler/compiler.py | 150 ++++++++---------- .../compiler/compiler_withparams_test.py | 3 +- 2 files changed, 67 insertions(+), 86 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 94e551586da..3c90a9bb05b 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -17,7 +17,7 @@ import inspect import tarfile import zipfile -from typing import Set, List, Text, Dict +from typing import Set, List, Text, Dict, Tuple, Any, Union, Optional import yaml from kfp.dsl import _container_op, _for_loop @@ -461,95 +461,77 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies): # Generate arguments section for this task. if inputs.get(sub_group.name, None): - arguments = [] - for param_name, dependent_name in inputs[sub_group.name]: - if dependent_name: - print(f'in dependent_name for {sub_group.name}') - # The value comes from an upstream sibling. - # Special handling for recursive subgroup: argument name comes from the existing opsgroup - if is_recursive_subgroup: - for index, input in enumerate(sub_group.inputs): - if param_name == self._pipelineparam_full_name(input): - break - referenced_input = sub_group.recursive_ref.inputs[index] - full_name = self._pipelineparam_full_name(referenced_input) - arguments.append({ - 'name': full_name, - 'value': '{{tasks.%s.outputs.parameters.%s}}' % (dependent_name, param_name) - }) - else: - arguments.append({ - 'name': param_name, - 'value': '{{tasks.%s.outputs.parameters.%s}}' % (dependent_name, param_name) - }) + task['arguments'] = {'parameters': self.get_arguments_for_sub_group(sub_group, is_recursive_subgroup, inputs)} + + # additional task modifications for withItems and withParam + if isinstance(sub_group, dsl.ParallelFor): + if sub_group.items_is_pipeline_param: + # these loop args are a 'withParam' rather than 'withItems'. + # i.e., rather than a static list, they are either the output of another task or were input + # as global pipeline parameters + + pipeline_param = sub_group.loop_args + if pipeline_param.op_name is None: + withparam_value = '{{workflow.parameters.%s}}' % pipeline_param.name else: - # The value comes from its parent. - # Special handling for recursive subgroup: argument name comes from the existing opsgroup - if is_recursive_subgroup: - for index, input in enumerate(sub_group.inputs): - if param_name == self._pipelineparam_full_name(input): - break - referenced_input = sub_group.recursive_ref.inputs[index] - full_name = self._pipelineparam_full_name(referenced_input) - arguments.append({ - 'name': full_name, - 'value': '{{inputs.parameters.%s}}' % param_name - }) - else: - print(f'about to check parallefor for {sub_group.name}') - if isinstance(sub_group, dsl.ParallelFor): - if sub_group.loop_args.name in param_name: - if _for_loop.LoopArgumentVariable.name_is_loop_arguments_variable(param_name): - subvar_name = _for_loop.LoopArgumentVariable.get_subvar_name(param_name) - param_value = '{{item.%s}}' % subvar_name - elif _for_loop.LoopArguments.name_is_loop_arguments(param_name) or sub_group.items_is_pipeline_param: - param_value = '{{item}}' - else: - raise ValueError("Failed to match loop args with parameter. param_name: {}, ".format(param_name)) - else: - param_value = '{{inputs.parameters.%s}}' % param_name - - ################### - # OLD SPOT - ################### - - # withParam vs withItems - if sub_group.items_is_pipeline_param: - # these loop args are a 'withParam' rather than 'withItems'. - # i.e., rather than a static list, they are either the output of another task or were input - # as global pipeline parameters - - pipeline_param = sub_group.loop_args - if pipeline_param.op_name is None: - withparam_value = '{{workflow.parameters.%s}}' % pipeline_param.name - else: - param_name = '%s-%s' % (pipeline_param.op_name, pipeline_param.name) - withparam_value = '{{tasks.%s.outputs.parameters.%s}}' % (pipeline_param.op_name, param_name) - - # these loop args are the output of another task - if 'dependencies' not in task or task['dependencies'] is None: - task['dependencies'] = [] - task['dependencies'].append(pipeline_param.op_name) - - task['withParam'] = withparam_value - else: - task['withItems'] = sub_group.loop_args.to_list_for_task_yaml() - else: - # subgroup is not a ParallelFor - param_value = '{{inputs.parameters.%s}}' % param_name - d = { - 'name': param_name, - } - if param_value is not None: - d['value'] = param_value - arguments.append(d) - arguments.sort(key=lambda x: x['name']) - task['arguments'] = {'parameters': arguments} + param_name = '%s-%s' % (pipeline_param.op_name, pipeline_param.name) + withparam_value = '{{tasks.%s.outputs.parameters.%s}}' % (pipeline_param.op_name, param_name) + + # these loop args are the output of another task + if 'dependencies' not in task or task['dependencies'] is None: + task['dependencies'] = [] + if pipeline_param.op_name not in task['dependencies']: + task['dependencies'].append(pipeline_param.op_name) + + task['withParam'] = withparam_value + else: + task['withItems'] = sub_group.loop_args.to_list_for_task_yaml() + tasks.append(task) tasks.sort(key=lambda x: x['name']) template['dag'] = {'tasks': tasks} return template + def get_arguments_for_sub_group( + self, + sub_group: Union[OpsGroup, dsl._container_op.BaseOp], + is_recursive_subgroup: Optional[bool], + inputs: Dict[Text, Tuple[Text, Text]], + ): + arguments = [] + for param_name, dependent_name in inputs[sub_group.name]: + if is_recursive_subgroup: + for index, input in enumerate(sub_group.inputs): + if param_name == self._pipelineparam_full_name(input): + break + referenced_input = sub_group.recursive_ref.inputs[index] + argument_name = self._pipelineparam_full_name(referenced_input) + else: + argument_name = param_name + + # default argument_value + special cases + argument_value = '{{inputs.parameters.%s}}' % param_name + if isinstance(sub_group, dsl.ParallelFor): + if sub_group.loop_args.name in param_name: + if _for_loop.LoopArgumentVariable.name_is_loop_arguments_variable(param_name): + subvar_name = _for_loop.LoopArgumentVariable.get_subvar_name(param_name) + argument_value = '{{item.%s}}' % subvar_name + elif _for_loop.LoopArguments.name_is_loop_arguments(param_name) or sub_group.items_is_pipeline_param: + argument_value = '{{item}}' + else: + raise ValueError("Failed to match loop args with parameter. param_name: {}, ".format(param_name)) + elif dependent_name: + argument_value = '{{tasks.%s.outputs.parameters.%s}}' % (dependent_name, param_name) + + arguments.append({ + 'name': argument_name, + 'value': argument_value, + }) + + arguments.sort(key=lambda x: x['name']) + + return arguments + def _create_dag_templates(self, pipeline, op_transformers=None, op_to_templates_handler=None): """Create all groups and ops templates in the pipeline. diff --git a/sdk/python/tests/compiler/compiler_withparams_test.py b/sdk/python/tests/compiler/compiler_withparams_test.py index 693a6e2c5eb..16003647f80 100644 --- a/sdk/python/tests/compiler/compiler_withparams_test.py +++ b/sdk/python/tests/compiler/compiler_withparams_test.py @@ -1,5 +1,4 @@ import time -from typing import Iterable, Text from kfp.compiler import compiler from kfp import dsl @@ -18,7 +17,7 @@ # ------------------------------------------- if __name__ == '__main__': - do_output = True + do_output = False if do_output: @dsl.pipeline(name='my-pipeline') From 61602b1c971924184b5b49fbd328f11b35fdd0d8 Mon Sep 17 00:00:00 2001 From: Kevin Bache Date: Wed, 4 Sep 2019 18:22:10 -0700 Subject: [PATCH 08/15] added withparam tests --- sdk/python/comp.yaml | 54 +++++++++ sdk/python/kfp/compiler/compiler.py | 2 - sdk/python/tests/compiler/compiler_tests.py | 7 +- ....py => compiler_withparams_test_helper.py} | 2 + .../compiler/testdata/withparam_global.py | 60 ++++++++++ .../compiler/testdata/withparam_global.yaml | 108 ++++++++++++++++++ .../compiler/testdata/withparam_output.py | 60 ++++++++++ .../compiler/testdata/withparam_output.yaml | 105 +++++++++++++++++ 8 files changed, 395 insertions(+), 3 deletions(-) create mode 100644 sdk/python/comp.yaml rename sdk/python/tests/compiler/{compiler_withparams_test.py => compiler_withparams_test_helper.py} (95%) create mode 100644 sdk/python/tests/compiler/testdata/withparam_global.py create mode 100644 sdk/python/tests/compiler/testdata/withparam_global.yaml create mode 100644 sdk/python/tests/compiler/testdata/withparam_output.py create mode 100644 sdk/python/tests/compiler/testdata/withparam_output.yaml diff --git a/sdk/python/comp.yaml b/sdk/python/comp.yaml new file mode 100644 index 00000000000..c83f06a5955 --- /dev/null +++ b/sdk/python/comp.yaml @@ -0,0 +1,54 @@ +implementation: + container: + args: + - --a + - inputValue: a + - --b + - inputValue: b + - if: + cond: + isPresent: arg + then: + - --arg + - inputValue: arg + - '----output-paths' + - outputPath: Output + command: + - python3 + - -u + - -c + - | + def assert_is_none(a, b, arg=None) -> int: + assert arg is None + return 1 + + import argparse + _missing_arg = object() + _parser = argparse.ArgumentParser(prog='Assert is none', description='') + _parser.add_argument("--a", dest="a", type=str, required=True, default=_missing_arg) + _parser.add_argument("--b", dest="b", type=str, required=True, default=_missing_arg) + _parser.add_argument("--arg", dest="arg", type=str, required=False, default=_missing_arg) + _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1) + _parsed_args = {k: v for k, v in vars(_parser.parse_args()).items() if v is not _missing_arg} + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = assert_is_none(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + from pathlib import Path + for idx, filename in enumerate(_output_files): + _output_path = Path(filename) + _output_path.parent.mkdir(parents=True, exist_ok=True) + _output_path.write_text(str(_outputs[idx])) + image: tensorflow/tensorflow:1.13.2-py3 +inputs: +- name: a +- name: b +- name: arg + optional: true +name: Assert is none +outputs: +- name: Output + type: Integer diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 3c90a9bb05b..f6a313091f8 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -334,7 +334,6 @@ def _get_dependencies(self, pipeline, root_group, op_groups, opsgroups_groups, o dependencies = defaultdict(set) for op in pipeline.ops.values(): upstream_op_names = set() - print(op.name) for param in op.inputs + list(condition_params[op.name]): if param.op_name: upstream_op_names.add(param.op_name) @@ -433,7 +432,6 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies): # Generate tasks section. tasks = [] for sub_group in group.groups + group.ops: - print(f"sub_group.name = {sub_group.name}") is_recursive_subgroup = (isinstance(sub_group, OpsGroup) and sub_group.recursive_ref) # Special handling for recursive subgroup: use the existing opsgroup name if is_recursive_subgroup: diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 09f02794f1a..4867365b417 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -670,7 +670,6 @@ def init_container_pipeline(): init_container = init_containers[0] self.assertEqual(init_container, {'image':'alpine:latest', 'command': ['echo', 'bye'], 'name': 'echo'}) - def test_delete_resource_op(self): """Test a pipeline with a delete resource operation.""" from kubernetes import client as k8s @@ -702,3 +701,9 @@ def some_pipeline(): self.assertIsNone(delete_op_template.get("successCondition")) self.assertIsNone(delete_op_template.get("failureCondition")) self.assertDictEqual(delete_op_template.get("outputs"), {}) + + def test_withparam_global(self): + self._test_py_compile_yaml('withparam_global') + + def test_withparam_output(self): + self._test_py_compile_yaml('withparam_output') diff --git a/sdk/python/tests/compiler/compiler_withparams_test.py b/sdk/python/tests/compiler/compiler_withparams_test_helper.py similarity index 95% rename from sdk/python/tests/compiler/compiler_withparams_test.py rename to sdk/python/tests/compiler/compiler_withparams_test_helper.py index 16003647f80..377ef2ed10d 100644 --- a/sdk/python/tests/compiler/compiler_withparams_test.py +++ b/sdk/python/tests/compiler/compiler_withparams_test_helper.py @@ -1,3 +1,5 @@ +"""This module is useful for generating yaml files for the withParams tests and for running unformal +compiler tests during development.""" import time from kfp.compiler import compiler diff --git a/sdk/python/tests/compiler/testdata/withparam_global.py b/sdk/python/tests/compiler/testdata/withparam_global.py new file mode 100644 index 00000000000..2a63cbe6ea2 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/withparam_global.py @@ -0,0 +1,60 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kfp.dsl as dsl +from kfp.dsl import _for_loop + + +class Coder: + def __init__(self, ): + self._code_id = 0 + + def get_code(self, ): + self._code_id += 1 + return '{code:0{num_chars:}d}'.format(code=self._code_id, num_chars=_for_loop.LoopArguments.NUM_CODE_CHARS) + + +dsl.ParallelFor._get_unique_id_code = Coder().get_code + + +@dsl.pipeline(name='my-pipeline') +def pipeline(loopidy_doop=[3, 5, 7, 9]): + op0 = dsl.ContainerOp( + name="my-out-cop0", + image='python:alpine3.6', + command=["sh", "-c"], + arguments=[ + 'python -c "import json; import sys; json.dump([i for i in range(20, 31)], open(\'/tmp/out.json\', \'w\'))"'], + file_outputs={'out': '/tmp/out.json'}, + ) + + with dsl.ParallelFor(loopidy_doop) as item: + op1 = dsl.ContainerOp( + name="my-in-cop1", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo no output global op1, item: %s" % item], + ).after(op0) + + op_out = dsl.ContainerOp( + name="my-out-cop2", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo no output global op2, outp: %s" % op0.output], + ) + + +if __name__ == '__main__': + from kfp import compiler + print(compiler.Compiler().compile(pipeline, package_path=None)) diff --git a/sdk/python/tests/compiler/testdata/withparam_global.yaml b/sdk/python/tests/compiler/testdata/withparam_global.yaml new file mode 100644 index 00000000000..da3578e809a --- /dev/null +++ b/sdk/python/tests/compiler/testdata/withparam_global.yaml @@ -0,0 +1,108 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": [3, 5, 7, 9], "name": + "loopidy_doop"}], "name": "my-pipeline"}' + generateName: my-pipeline- +spec: + arguments: + parameters: + - name: loopidy-doop + value: '[3, 5, 7, 9]' + entrypoint: my-pipeline + serviceAccountName: pipeline-runner + templates: + - dag: + tasks: + - arguments: + parameters: + - name: loopidy-doop + value: '{{inputs.parameters.loopidy-doop}}' + name: my-in-cop1 + template: my-in-cop1 + inputs: + parameters: + - name: loopidy-doop + name: for-loop-for-loop-00000001-1 + - container: + args: + - 'echo no output global op1, item: {{inputs.parameters.loopidy-doop}}' + command: + - sh + - -c + image: library/bash:4.4.23 + inputs: + parameters: + - name: loopidy-doop + name: my-in-cop1 + outputs: + artifacts: + - name: mlpipeline-ui-metadata + optional: true + path: /mlpipeline-ui-metadata.json + - name: mlpipeline-metrics + optional: true + path: /mlpipeline-metrics.json + - container: + args: + - python -c "import json; import sys; json.dump([i for i in range(20, 31)], + open('/tmp/out.json', 'w'))" + command: + - sh + - -c + image: python:alpine3.6 + name: my-out-cop0 + outputs: + artifacts: + - name: mlpipeline-ui-metadata + optional: true + path: /mlpipeline-ui-metadata.json + - name: mlpipeline-metrics + optional: true + path: /mlpipeline-metrics.json + parameters: + - name: my-out-cop0-out + valueFrom: + path: /tmp/out.json + - container: + args: + - 'echo no output global op2, outp: {{inputs.parameters.my-out-cop0-out}}' + command: + - sh + - -c + image: library/bash:4.4.23 + inputs: + parameters: + - name: my-out-cop0-out + name: my-out-cop2 + outputs: + artifacts: + - name: mlpipeline-ui-metadata + optional: true + path: /mlpipeline-ui-metadata.json + - name: mlpipeline-metrics + optional: true + path: /mlpipeline-metrics.json + - dag: + tasks: + - arguments: + parameters: + - name: loopidy-doop + value: '{{item}}' + dependencies: + - my-out-cop0 + name: for-loop-for-loop-00000001-1 + template: for-loop-for-loop-00000001-1 + withParam: '{{workflow.parameters.loopidy-doop}}' + - name: my-out-cop0 + template: my-out-cop0 + - arguments: + parameters: + - name: my-out-cop0-out + value: '{{tasks.my-out-cop0.outputs.parameters.my-out-cop0-out}}' + dependencies: + - my-out-cop0 + name: my-out-cop2 + template: my-out-cop2 + name: my-pipeline diff --git a/sdk/python/tests/compiler/testdata/withparam_output.py b/sdk/python/tests/compiler/testdata/withparam_output.py new file mode 100644 index 00000000000..e77fe9ef079 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/withparam_output.py @@ -0,0 +1,60 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kfp.dsl as dsl +from kfp.dsl import _for_loop + + +class Coder: + def __init__(self, ): + self._code_id = 0 + + def get_code(self, ): + self._code_id += 1 + return '{code:0{num_chars:}d}'.format(code=self._code_id, num_chars=_for_loop.LoopArguments.NUM_CODE_CHARS) + + +dsl.ParallelFor._get_unique_id_code = Coder().get_code + + +@dsl.pipeline(name='my-pipeline') +def pipeline(): + op0 = dsl.ContainerOp( + name="my-out-cop0", + image='python:alpine3.6', + command=["sh", "-c"], + arguments=[ + 'python -c "import json; import sys; json.dump([i for i in range(20, 31)], open(\'/tmp/out.json\', \'w\'))"'], + file_outputs={'out': '/tmp/out.json'}, + ) + + with dsl.ParallelFor(op0.output) as item: + op1 = dsl.ContainerOp( + name="my-in-cop1", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo do output op1 item: %s" % item], + ) + + op_out = dsl.ContainerOp( + name="my-out-cop2", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo do output op2, outp: %s" % op0.output], + ) + + +if __name__ == '__main__': + from kfp import compiler + print(compiler.Compiler().compile(pipeline, package_path=None)) diff --git a/sdk/python/tests/compiler/testdata/withparam_output.yaml b/sdk/python/tests/compiler/testdata/withparam_output.yaml new file mode 100644 index 00000000000..c76a6f58543 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/withparam_output.yaml @@ -0,0 +1,105 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"name": "my-pipeline"}' + generateName: my-pipeline- +spec: + arguments: + parameters: [] + entrypoint: my-pipeline + serviceAccountName: pipeline-runner + templates: + - dag: + tasks: + - arguments: + parameters: + - name: my-out-cop0-out + value: '{{inputs.parameters.my-out-cop0-out}}' + name: my-in-cop1 + template: my-in-cop1 + inputs: + parameters: + - name: my-out-cop0-out + name: for-loop-for-loop-00000001-1 + - container: + args: + - 'echo do output op1 item: {{inputs.parameters.my-out-cop0-out}}' + command: + - sh + - -c + image: library/bash:4.4.23 + inputs: + parameters: + - name: my-out-cop0-out + name: my-in-cop1 + outputs: + artifacts: + - name: mlpipeline-ui-metadata + optional: true + path: /mlpipeline-ui-metadata.json + - name: mlpipeline-metrics + optional: true + path: /mlpipeline-metrics.json + - container: + args: + - python -c "import json; import sys; json.dump([i for i in range(20, 31)], + open('/tmp/out.json', 'w'))" + command: + - sh + - -c + image: python:alpine3.6 + name: my-out-cop0 + outputs: + artifacts: + - name: mlpipeline-ui-metadata + optional: true + path: /mlpipeline-ui-metadata.json + - name: mlpipeline-metrics + optional: true + path: /mlpipeline-metrics.json + parameters: + - name: my-out-cop0-out + valueFrom: + path: /tmp/out.json + - container: + args: + - 'echo do output op2, outp: {{inputs.parameters.my-out-cop0-out}}' + command: + - sh + - -c + image: library/bash:4.4.23 + inputs: + parameters: + - name: my-out-cop0-out + name: my-out-cop2 + outputs: + artifacts: + - name: mlpipeline-ui-metadata + optional: true + path: /mlpipeline-ui-metadata.json + - name: mlpipeline-metrics + optional: true + path: /mlpipeline-metrics.json + - dag: + tasks: + - arguments: + parameters: + - name: my-out-cop0-out + value: '{{item}}' + dependencies: + - my-out-cop0 + name: for-loop-for-loop-00000001-1 + template: for-loop-for-loop-00000001-1 + withParam: '{{tasks.my-out-cop0.outputs.parameters.my-out-cop0-out}}' + - name: my-out-cop0 + template: my-out-cop0 + - arguments: + parameters: + - name: my-out-cop0-out + value: '{{tasks.my-out-cop0.outputs.parameters.my-out-cop0-out}}' + dependencies: + - my-out-cop0 + name: my-out-cop2 + template: my-out-cop2 + name: my-pipeline From 85284edec062112a1de350e2e9f6069e909de15f Mon Sep 17 00:00:00 2001 From: Kevin Bache Date: Wed, 4 Sep 2019 18:24:38 -0700 Subject: [PATCH 09/15] removed sdk/python/comp.yaml --- sdk/python/comp.yaml | 54 -------------------------------------------- 1 file changed, 54 deletions(-) delete mode 100644 sdk/python/comp.yaml diff --git a/sdk/python/comp.yaml b/sdk/python/comp.yaml deleted file mode 100644 index c83f06a5955..00000000000 --- a/sdk/python/comp.yaml +++ /dev/null @@ -1,54 +0,0 @@ -implementation: - container: - args: - - --a - - inputValue: a - - --b - - inputValue: b - - if: - cond: - isPresent: arg - then: - - --arg - - inputValue: arg - - '----output-paths' - - outputPath: Output - command: - - python3 - - -u - - -c - - | - def assert_is_none(a, b, arg=None) -> int: - assert arg is None - return 1 - - import argparse - _missing_arg = object() - _parser = argparse.ArgumentParser(prog='Assert is none', description='') - _parser.add_argument("--a", dest="a", type=str, required=True, default=_missing_arg) - _parser.add_argument("--b", dest="b", type=str, required=True, default=_missing_arg) - _parser.add_argument("--arg", dest="arg", type=str, required=False, default=_missing_arg) - _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1) - _parsed_args = {k: v for k, v in vars(_parser.parse_args()).items() if v is not _missing_arg} - _output_files = _parsed_args.pop("_output_paths", []) - - _outputs = assert_is_none(**_parsed_args) - - if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): - _outputs = [_outputs] - - from pathlib import Path - for idx, filename in enumerate(_output_files): - _output_path = Path(filename) - _output_path.parent.mkdir(parents=True, exist_ok=True) - _output_path.write_text(str(_outputs[idx])) - image: tensorflow/tensorflow:1.13.2-py3 -inputs: -- name: a -- name: b -- name: arg - optional: true -name: Assert is none -outputs: -- name: Output - type: Integer From efb517d0493c11120b82d42147bf06b16128c0e7 Mon Sep 17 00:00:00 2001 From: Kevin Bache Date: Wed, 4 Sep 2019 18:31:55 -0700 Subject: [PATCH 10/15] minor --- sdk/python/kfp/compiler/compiler.py | 2 +- sdk/python/kfp/dsl/_for_loop.py | 7 +++++++ sdk/python/kfp/dsl/_ops_group.py | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index f6a313091f8..8533b7094cf 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -1,4 +1,4 @@ - # Copyright 2018-2019 Google LLC +# Copyright 2018-2019 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/sdk/python/kfp/dsl/_for_loop.py b/sdk/python/kfp/dsl/_for_loop.py index 6f5e60907fa..f57e568c1d2 100644 --- a/sdk/python/kfp/dsl/_for_loop.py +++ b/sdk/python/kfp/dsl/_for_loop.py @@ -44,6 +44,13 @@ def __init__(self, items: Union[ItemList, dsl.PipelineParam], code: Text, name_o raise ValueError("If you input a list of dicts then all dicts should have the same keys. " "Got: {}.".format(items)) + # then this block creates loop_args.variable_a and loop_args.variable_b + for subvar_name in subvar_names: + if not self._subvar_name_is_legal(subvar_name): + raise ValueError("Tried to create subvariable named {} but that's not a legal Python variable " + "name.".format(subvar_name)) + setattr(self, subvar_name, LoopArgumentVariable(self.name, subvar_name)) + self.items_or_pipeline_param = items self.referenced_subvar_names = [] diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index 588d7b34830..406a13bd5f8 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -10,7 +10,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -import json +# limitations under the License. from typing import Union import uuid From b68eb0dcb527e58e6cdd435eb04e0e3405a7e384 Mon Sep 17 00:00:00 2001 From: Kevin Bache Date: Fri, 13 Sep 2019 15:45:27 -0700 Subject: [PATCH 11/15] subvars work --- sdk/python/kfp/compiler/compiler.py | 6 ++-- sdk/python/kfp/dsl/_for_loop.py | 27 ++++++++++----- sdk/python/kfp/dsl/_ops_group.py | 4 ++- sdk/python/kfp/dsl/_pipeline_param.py | 2 +- .../compiler_withparams_test_helper.py | 33 +++++++++---------- 5 files changed, 41 insertions(+), 31 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 590fc69d6bf..077f60c3cd1 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -11,8 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - +import json from collections import defaultdict import inspect import tarfile @@ -598,6 +597,7 @@ def _create_dag_templates(self, pipeline, op_transformers=None, op_to_templates_ for op in pipeline.ops.values(): templates.extend(op_to_templates_handler(op)) + return templates def _create_volumes(self, pipeline): @@ -623,7 +623,7 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None): for arg in args: param = {'name': arg.name} if arg.value is not None: - param['value'] = str(arg.value) + param['value'] = json.dumps(arg.value) input_params.append(param) # Templates diff --git a/sdk/python/kfp/dsl/_for_loop.py b/sdk/python/kfp/dsl/_for_loop.py index f57e568c1d2..95c05fdfd94 100644 --- a/sdk/python/kfp/dsl/_for_loop.py +++ b/sdk/python/kfp/dsl/_for_loop.py @@ -19,7 +19,7 @@ class LoopArguments(dsl.PipelineParam): def _subvar_name_is_legal(cls, proposed_variable_name: Text): return re.match(cls.LEGAL_SUBVAR_NAME_REGEX, proposed_variable_name) is not None - def __init__(self, items: Union[ItemList, dsl.PipelineParam], code: Text, name_override: Optional[Text]=None, op_name: Optional[Text]=None): + def __init__(self, items: Union[ItemList, dsl.PipelineParam], code: Text, name_override: Optional[Text]=None, op_name: Optional[Text]=None, *args, **kwargs): """_LoopArguments represent the set of items to loop over in a ParallelFor loop. This class shoudn't be instantiated by the user but rather is created by _ops_group.ParallelFor. @@ -30,13 +30,16 @@ def __init__(self, items: Union[ItemList, dsl.PipelineParam], code: Text, name_o ops_group which created these _LoopArguments. This prevents parameter name collissions. """ if name_override is None: - super().__init__(name=self._make_name(code)) + super().__init__(name=self._make_name(code), *args, **kwargs) else: - super().__init__(name=name_override, op_name=op_name) + super().__init__(name=name_override, op_name=op_name, *args, **kwargs) if not isinstance(items, (list, tuple, dsl.PipelineParam)): raise TypeError("Expected list, tuple, or PipelineParam, got {}.".format(type(items))) + if isinstance(items, tuple): + items = list(items) + if isinstance(items, list) and isinstance(items[0], dict): subvar_names = set(items[0].keys()) for item in items: @@ -54,6 +57,16 @@ def __init__(self, items: Union[ItemList, dsl.PipelineParam], code: Text, name_o self.items_or_pipeline_param = items self.referenced_subvar_names = [] + @classmethod + def from_pipeline_param(cls, param: dsl.PipelineParam) -> 'LoopArguments': + return LoopArguments( + items=param, + code=None, + name_override=param.name, + op_name=param.op_name, + value=param.value, + ) + def __getattr__(self, item): # this is being overridden so that we can access subvariables of the LoopArguments (i.e.: item.a) without # knowing the subvariable names ahead of time @@ -77,7 +90,7 @@ def name_is_loop_arguments(cls, param_name: Text) -> bool: """Return True if the given parameter name looks like it came from a loop arguments parameter.""" return re.match( '%s-[0-9a-f]{%s}' % (cls.LOOP_ITEM_PARAM_NAME_BASE, cls.NUM_CODE_CHARS), - param_name + param_name, ) is not None @@ -116,11 +129,7 @@ def get_name(cls, loop_args_name: Text, this_variable_name: Text) -> Text: def name_is_loop_arguments_variable(cls, param_name: Text) -> bool: """Return True if the given parameter name looks like it came from a LoopArgumentsVariable.""" return re.match( - '%s-[0-9a-f]{%s}%s.*' % ( - LoopArguments.LOOP_ITEM_PARAM_NAME_BASE, - LoopArguments.NUM_CODE_CHARS, - cls.SUBVAR_NAME_DELIMITER - ), + '.+%s.+' % cls.SUBVAR_NAME_DELIMITER, param_name ) is not None diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index 406a13bd5f8..5ece2ca9c2c 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -178,7 +178,9 @@ def __init__(self, loop_args: Union[_for_loop.ItemList, _pipeline_param.Pipeline super().__init__(self.TYPE_NAME, name=group_name) self.items_is_pipeline_param = isinstance(loop_args, _pipeline_param.PipelineParam) - if not self.items_is_pipeline_param and not isinstance(loop_args, _for_loop.LoopArguments): + if self.items_is_pipeline_param: + loop_args = _for_loop.LoopArguments.from_pipeline_param(loop_args) + elif not self.items_is_pipeline_param and not isinstance(loop_args, _for_loop.LoopArguments): # we were passed a raw list, wrap it in loop args loop_args = _for_loop.LoopArguments(loop_args, code) diff --git a/sdk/python/kfp/dsl/_pipeline_param.py b/sdk/python/kfp/dsl/_pipeline_param.py index 10fa31f9959..607143459f3 100644 --- a/sdk/python/kfp/dsl/_pipeline_param.py +++ b/sdk/python/kfp/dsl/_pipeline_param.py @@ -191,7 +191,7 @@ def __str__(self): return '{{pipelineparam:op=%s;name=%s}}' % (op_name, self.name) def __repr__(self): - return str({self.__class__.__name__: self.__dict__}) + return str({self.__class__.__name__: self.__dict__}) def __eq__(self, other): return ConditionOperator('==', self, other) diff --git a/sdk/python/tests/compiler/compiler_withparams_test_helper.py b/sdk/python/tests/compiler/compiler_withparams_test_helper.py index 377ef2ed10d..eac22146b47 100644 --- a/sdk/python/tests/compiler/compiler_withparams_test_helper.py +++ b/sdk/python/tests/compiler/compiler_withparams_test_helper.py @@ -4,23 +4,25 @@ from kfp.compiler import compiler from kfp import dsl +from kfp.dsl import _for_loop + + +class Coder: + def __init__(self, ): + self._code_id = 0 + + def get_code(self, ): + self._code_id += 1 + return '{code:0{num_chars:}d}'.format(code=self._code_id, num_chars=_for_loop.LoopArguments.NUM_CODE_CHARS) + + +dsl.ParallelFor._get_unique_id_code = Coder().get_code -# reproducible UUIDs: https://stackoverflow.com/a/56757552/9357327 -import uuid -import random -# ------------------------------------------- -# Remove this block to generate different -# UUIDs everytime you run this code. -# This block should be right below the uuid -# import. -rd = random.Random() -rd.seed(0) -uuid.uuid4 = lambda: uuid.UUID(int=rd.getrandbits(128)) -# ------------------------------------------- if __name__ == '__main__': do_output = False + params = {} if do_output: @dsl.pipeline(name='my-pipeline') def pipeline(): @@ -32,7 +34,6 @@ def pipeline(): file_outputs={'out': '/tmp/out.json'}, ) - # loop_args = [{'a': 1, 'b': 2}, {'a': 10, 'b': 20}] with dsl.ParallelFor(op0.output) as item: op1 = dsl.ContainerOp( name="my-in-cop1", @@ -49,10 +50,9 @@ def pipeline(): ) job_name = f'do-output=TRUE-passed-{time.time()}' - params = {} else: @dsl.pipeline(name='my-pipeline') - def pipeline(loopidy_doop=[3, 5, 7, 9]): + def pipeline(loopidy_doop=[{'a': 1, 'b': 2}, {'a': 10, 'b': 20}]): op0 = dsl.ContainerOp( name="my-out-cop0", image='python:alpine3.6', @@ -66,7 +66,7 @@ def pipeline(loopidy_doop=[3, 5, 7, 9]): name="my-in-cop1", image="library/bash:4.4.23", command=["sh", "-c"], - arguments=["echo no output global op1, item: %s" % item], + arguments=["echo no output global op1, item: %s" % item.a], ).after(op0) op_out = dsl.ContainerOp( @@ -77,7 +77,6 @@ def pipeline(loopidy_doop=[3, 5, 7, 9]): ) job_name = f'do-output=FALSE-global-{time.time()}' - params = {} yaml_text = compiler.Compiler().compile(pipeline, None) print(yaml_text) From 8d19ea8910b22e6111a1ed0a1bdcbbf1e7327ec0 Mon Sep 17 00:00:00 2001 From: Kevin Bache Date: Fri, 13 Sep 2019 16:39:33 -0700 Subject: [PATCH 12/15] more tests --- sdk/python/tests/compiler/compiler_tests.py | 6 + .../compiler_withparams_test_helper.py | 7 +- .../testdata/withparam_global_dict.py | 60 ++++++++++ .../testdata/withparam_global_dict.yaml | 108 ++++++++++++++++++ .../testdata/withparam_output_dict.py | 59 ++++++++++ .../testdata/withparam_output_dict.yaml | 105 +++++++++++++++++ 6 files changed, 342 insertions(+), 3 deletions(-) create mode 100644 sdk/python/tests/compiler/testdata/withparam_global_dict.py create mode 100644 sdk/python/tests/compiler/testdata/withparam_global_dict.yaml create mode 100644 sdk/python/tests/compiler/testdata/withparam_output_dict.py create mode 100644 sdk/python/tests/compiler/testdata/withparam_output_dict.yaml diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 0a4aef18638..0422e07d4ef 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -705,9 +705,15 @@ def some_pipeline(): def test_withparam_global(self): self._test_py_compile_yaml('withparam_global') + def test_withparam_global_dict(self): + self._test_py_compile_yaml('withparam_global_dict') + def test_withparam_output(self): self._test_py_compile_yaml('withparam_output') + def test_withparam_output_dict(self): + self._test_py_compile_yaml('withparam_output_dict') + def test_py_input_artifact_raw_value(self): """Test pipeline input_artifact_raw_value.""" self._test_py_compile_yaml('input_artifact_raw_value') diff --git a/sdk/python/tests/compiler/compiler_withparams_test_helper.py b/sdk/python/tests/compiler/compiler_withparams_test_helper.py index eac22146b47..1a20019da3c 100644 --- a/sdk/python/tests/compiler/compiler_withparams_test_helper.py +++ b/sdk/python/tests/compiler/compiler_withparams_test_helper.py @@ -20,7 +20,7 @@ def get_code(self, ): if __name__ == '__main__': - do_output = False + do_output = True params = {} if do_output: @@ -30,7 +30,8 @@ def pipeline(): name="my-out-cop0", image='python:alpine3.6', command=["sh", "-c"], - arguments=['python -c "import json; import sys; json.dump([i for i in range(20, 31)], open(\'/tmp/out.json\', \'w\'))"'], + # arguments=['python -c "import json; import sys; json.dump([i for i in range(20, 31)], open(\'/tmp/out.json\', \'w\'))"'], + arguments=['python -c "import json; import sys; json.dump([{\'a\': 1, \'b\': 2}, {\'a\': 10, \'b\': 20}], open(\'/tmp/out.json\', \'w\'))"'], file_outputs={'out': '/tmp/out.json'}, ) @@ -39,7 +40,7 @@ def pipeline(): name="my-in-cop1", image="library/bash:4.4.23", command=["sh", "-c"], - arguments=["echo do output op1 item: %s" % item], + arguments=["echo do output op1 item.a: %s" % item.a], ) op_out = dsl.ContainerOp( diff --git a/sdk/python/tests/compiler/testdata/withparam_global_dict.py b/sdk/python/tests/compiler/testdata/withparam_global_dict.py new file mode 100644 index 00000000000..cb307574894 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/withparam_global_dict.py @@ -0,0 +1,60 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kfp.dsl as dsl +from kfp.dsl import _for_loop + + +class Coder: + def __init__(self, ): + self._code_id = 0 + + def get_code(self, ): + self._code_id += 1 + return '{code:0{num_chars:}d}'.format(code=self._code_id, num_chars=_for_loop.LoopArguments.NUM_CODE_CHARS) + + +dsl.ParallelFor._get_unique_id_code = Coder().get_code + + +@dsl.pipeline(name='my-pipeline') +def pipeline(loopidy_doop=[{'a': 1, 'b': 2}, {'a': 10, 'b': 20}]): + op0 = dsl.ContainerOp( + name="my-out-cop0", + image='python:alpine3.6', + command=["sh", "-c"], + arguments=[ + 'python -c "import json; import sys; json.dump([i for i in range(20, 31)], open(\'/tmp/out.json\', \'w\'))"'], + file_outputs={'out': '/tmp/out.json'}, + ) + + with dsl.ParallelFor(loopidy_doop) as item: + op1 = dsl.ContainerOp( + name="my-in-cop1", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo no output global op1, item: %s" % item], + ).after(op0) + + op_out = dsl.ContainerOp( + name="my-out-cop2", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo no output global op2, outp: %s" % op0.output], + ) + + +if __name__ == '__main__': + from kfp import compiler + print(compiler.Compiler().compile(pipeline, package_path=None)) diff --git a/sdk/python/tests/compiler/testdata/withparam_global_dict.yaml b/sdk/python/tests/compiler/testdata/withparam_global_dict.yaml new file mode 100644 index 00000000000..d3cdba5caee --- /dev/null +++ b/sdk/python/tests/compiler/testdata/withparam_global_dict.yaml @@ -0,0 +1,108 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": [{"a": 1, "b": + 2}, {"a": 10, "b": 20}], "name": "loopidy_doop"}], "name": "my-pipeline"}' + generateName: my-pipeline- +spec: + arguments: + parameters: + - name: loopidy-doop + value: '[{"a": 1, "b": 2}, {"a": 10, "b": 20}]' + entrypoint: my-pipeline + serviceAccountName: pipeline-runner + templates: + - dag: + tasks: + - arguments: + parameters: + - name: loopidy-doop + value: '{{inputs.parameters.loopidy-doop}}' + name: my-in-cop1 + template: my-in-cop1 + inputs: + parameters: + - name: loopidy-doop + name: for-loop-for-loop-00000001-1 + - container: + args: + - 'echo no output global op1, item: {{inputs.parameters.loopidy-doop}}' + command: + - sh + - -c + image: library/bash:4.4.23 + inputs: + parameters: + - name: loopidy-doop + name: my-in-cop1 + outputs: + artifacts: + - name: mlpipeline-ui-metadata + optional: true + path: /mlpipeline-ui-metadata.json + - name: mlpipeline-metrics + optional: true + path: /mlpipeline-metrics.json + - container: + args: + - python -c "import json; import sys; json.dump([i for i in range(20, 31)], + open('/tmp/out.json', 'w'))" + command: + - sh + - -c + image: python:alpine3.6 + name: my-out-cop0 + outputs: + artifacts: + - name: mlpipeline-ui-metadata + optional: true + path: /mlpipeline-ui-metadata.json + - name: mlpipeline-metrics + optional: true + path: /mlpipeline-metrics.json + parameters: + - name: my-out-cop0-out + valueFrom: + path: /tmp/out.json + - container: + args: + - 'echo no output global op2, outp: {{inputs.parameters.my-out-cop0-out}}' + command: + - sh + - -c + image: library/bash:4.4.23 + inputs: + parameters: + - name: my-out-cop0-out + name: my-out-cop2 + outputs: + artifacts: + - name: mlpipeline-ui-metadata + optional: true + path: /mlpipeline-ui-metadata.json + - name: mlpipeline-metrics + optional: true + path: /mlpipeline-metrics.json + - dag: + tasks: + - arguments: + parameters: + - name: loopidy-doop + value: '{{item}}' + dependencies: + - my-out-cop0 + name: for-loop-for-loop-00000001-1 + template: for-loop-for-loop-00000001-1 + withParam: '{{workflow.parameters.loopidy-doop}}' + - name: my-out-cop0 + template: my-out-cop0 + - arguments: + parameters: + - name: my-out-cop0-out + value: '{{tasks.my-out-cop0.outputs.parameters.my-out-cop0-out}}' + dependencies: + - my-out-cop0 + name: my-out-cop2 + template: my-out-cop2 + name: my-pipeline \ No newline at end of file diff --git a/sdk/python/tests/compiler/testdata/withparam_output_dict.py b/sdk/python/tests/compiler/testdata/withparam_output_dict.py new file mode 100644 index 00000000000..1cd0aa5dd61 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/withparam_output_dict.py @@ -0,0 +1,59 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kfp.dsl as dsl +from kfp.dsl import _for_loop + + +class Coder: + def __init__(self, ): + self._code_id = 0 + + def get_code(self, ): + self._code_id += 1 + return '{code:0{num_chars:}d}'.format(code=self._code_id, num_chars=_for_loop.LoopArguments.NUM_CODE_CHARS) + + +dsl.ParallelFor._get_unique_id_code = Coder().get_code + + +@dsl.pipeline(name='my-pipeline') +def pipeline(): + op0 = dsl.ContainerOp( + name="my-out-cop0", + image='python:alpine3.6', + command=["sh", "-c"], + arguments=['python -c "import json; import sys; json.dump([{\'a\': 1, \'b\': 2}, {\'a\': 10, \'b\': 20}], open(\'/tmp/out.json\', \'w\'))"'], + file_outputs={'out': '/tmp/out.json'}, + ) + + with dsl.ParallelFor(op0.output) as item: + op1 = dsl.ContainerOp( + name="my-in-cop1", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo do output op1 item: %s" % item], + ) + + op_out = dsl.ContainerOp( + name="my-out-cop2", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo do output op2, outp: %s" % op0.output], + ) + + +if __name__ == '__main__': + from kfp import compiler + print(compiler.Compiler().compile(pipeline, package_path=None)) diff --git a/sdk/python/tests/compiler/testdata/withparam_output_dict.yaml b/sdk/python/tests/compiler/testdata/withparam_output_dict.yaml new file mode 100644 index 00000000000..ff04daf4023 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/withparam_output_dict.yaml @@ -0,0 +1,105 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"name": "my-pipeline"}' + generateName: my-pipeline- +spec: + arguments: + parameters: [] + entrypoint: my-pipeline + serviceAccountName: pipeline-runner + templates: + - dag: + tasks: + - arguments: + parameters: + - name: out-subvar-a + value: '{{inputs.parameters.out-subvar-a}}' + name: my-in-cop1 + template: my-in-cop1 + inputs: + parameters: + - name: out-subvar-a + name: for-loop-for-loop-00000001-1 + - container: + args: + - 'echo do output op1 item.a: {{inputs.parameters.out-subvar-a}}' + command: + - sh + - -c + image: library/bash:4.4.23 + inputs: + parameters: + - name: out-subvar-a + name: my-in-cop1 + outputs: + artifacts: + - name: mlpipeline-ui-metadata + optional: true + path: /mlpipeline-ui-metadata.json + - name: mlpipeline-metrics + optional: true + path: /mlpipeline-metrics.json + - container: + args: + - 'python -c "import json; import sys; json.dump([{''a'': 1, ''b'': 2}, {''a'': + 10, ''b'': 20}], open(''/tmp/out.json'', ''w''))"' + command: + - sh + - -c + image: python:alpine3.6 + name: my-out-cop0 + outputs: + artifacts: + - name: mlpipeline-ui-metadata + optional: true + path: /mlpipeline-ui-metadata.json + - name: mlpipeline-metrics + optional: true + path: /mlpipeline-metrics.json + parameters: + - name: my-out-cop0-out + valueFrom: + path: /tmp/out.json + - container: + args: + - 'echo do output op2, outp: {{inputs.parameters.my-out-cop0-out}}' + command: + - sh + - -c + image: library/bash:4.4.23 + inputs: + parameters: + - name: my-out-cop0-out + name: my-out-cop2 + outputs: + artifacts: + - name: mlpipeline-ui-metadata + optional: true + path: /mlpipeline-ui-metadata.json + - name: mlpipeline-metrics + optional: true + path: /mlpipeline-metrics.json + - dag: + tasks: + - arguments: + parameters: + - name: out-subvar-a + value: '{{item.a}}' + dependencies: + - my-out-cop0 + name: for-loop-for-loop-00000001-1 + template: for-loop-for-loop-00000001-1 + withParam: '{{tasks.my-out-cop0.outputs.parameters.my-out-cop0-out}}' + - name: my-out-cop0 + template: my-out-cop0 + - arguments: + parameters: + - name: my-out-cop0-out + value: '{{tasks.my-out-cop0.outputs.parameters.my-out-cop0-out}}' + dependencies: + - my-out-cop0 + name: my-out-cop2 + template: my-out-cop2 + name: my-pipeline From d0942d9483fe732e538eb39114c69b63be36987b Mon Sep 17 00:00:00 2001 From: Kevin Bache Date: Fri, 13 Sep 2019 17:47:34 -0700 Subject: [PATCH 13/15] removed unneeded artifact outputs from test yaml --- sdk/python/kfp/compiler/compiler.py | 8 ++-- sdk/python/kfp/dsl/_for_loop.py | 2 +- sdk/python/kfp/dsl/_ops_group.py | 1 - .../compiler_withparams_test_helper.py | 3 +- .../tests/compiler/testdata/withitem_basic.py | 13 +++++++ .../compiler/testdata/withitem_basic.yaml | 4 +- .../compiler/testdata/withitem_nested.py | 18 +++++++++ .../compiler/testdata/withitem_nested.yaml | 4 +- .../compiler/testdata/withparam_global.py | 13 +++++++ .../compiler/testdata/withparam_global.yaml | 23 ----------- .../testdata/withparam_global_dict.py | 15 ++++++- .../testdata/withparam_global_dict.yaml | 39 ++++--------------- .../compiler/testdata/withparam_output.py | 13 +++++++ .../compiler/testdata/withparam_output.yaml | 23 ----------- .../testdata/withparam_output_dict.py | 15 ++++++- .../testdata/withparam_output_dict.yaml | 23 ----------- 16 files changed, 104 insertions(+), 113 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 1d57b2095b9..950cbc584f5 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -17,10 +17,9 @@ import tarfile import zipfile from typing import Callable, Set, List, Text, Dict, Tuple, Any, Union, Optional -from typing import Any, Set, List, Text, Dict import yaml -from kfp.dsl import _container_op, _for_loop +from kfp.dsl import _for_loop from .. import dsl from ._k8s_helper import K8sHelper @@ -627,7 +626,10 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None): for arg in args: param = {'name': arg.name} if arg.value is not None: - param['value'] = json.dumps(arg.value) + if isinstance(arg.value, (list, tuple)): + param['value'] = json.dumps(arg.value) + else: + param['value'] = str(arg.value) input_params.append(param) # Templates diff --git a/sdk/python/kfp/dsl/_for_loop.py b/sdk/python/kfp/dsl/_for_loop.py index 95c05fdfd94..dd2d56b6628 100644 --- a/sdk/python/kfp/dsl/_for_loop.py +++ b/sdk/python/kfp/dsl/_for_loop.py @@ -74,7 +74,7 @@ def __getattr__(self, item): return LoopArgumentVariable(self.name, item) def to_list_for_task_yaml(self): - if isinstance(self.items_or_pipeline_param, list): + if isinstance(self.items_or_pipeline_param, (list, tuple)): return self.items_or_pipeline_param else: raise ValueError("You should only call this method on loop args which have list items, " diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index 5ece2ca9c2c..5f2e1ba4856 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -177,7 +177,6 @@ def __init__(self, loop_args: Union[_for_loop.ItemList, _pipeline_param.Pipeline group_name = 'for-loop-{}'.format(code) super().__init__(self.TYPE_NAME, name=group_name) - self.items_is_pipeline_param = isinstance(loop_args, _pipeline_param.PipelineParam) if self.items_is_pipeline_param: loop_args = _for_loop.LoopArguments.from_pipeline_param(loop_args) elif not self.items_is_pipeline_param and not isinstance(loop_args, _for_loop.LoopArguments): diff --git a/sdk/python/tests/compiler/compiler_withparams_test_helper.py b/sdk/python/tests/compiler/compiler_withparams_test_helper.py index 1a20019da3c..9814fb248fb 100644 --- a/sdk/python/tests/compiler/compiler_withparams_test_helper.py +++ b/sdk/python/tests/compiler/compiler_withparams_test_helper.py @@ -30,8 +30,7 @@ def pipeline(): name="my-out-cop0", image='python:alpine3.6', command=["sh", "-c"], - # arguments=['python -c "import json; import sys; json.dump([i for i in range(20, 31)], open(\'/tmp/out.json\', \'w\'))"'], - arguments=['python -c "import json; import sys; json.dump([{\'a\': 1, \'b\': 2}, {\'a\': 10, \'b\': 20}], open(\'/tmp/out.json\', \'w\'))"'], + arguments=['python -c "import json; import sys; json.dump([{\'a\': 1, \'b\': 2}, {\'a\': 10, \'b\': 20}], open(\'/tmp/out.json\', \'w\'))"'], file_outputs={'out': '/tmp/out.json'}, ) diff --git a/sdk/python/tests/compiler/testdata/withitem_basic.py b/sdk/python/tests/compiler/testdata/withitem_basic.py index b19bd83db28..0f6aa56ff86 100644 --- a/sdk/python/tests/compiler/testdata/withitem_basic.py +++ b/sdk/python/tests/compiler/testdata/withitem_basic.py @@ -57,3 +57,16 @@ def pipeline(my_pipe_param=10): if __name__ == '__main__': from kfp import compiler print(compiler.Compiler().compile(pipeline, package_path=None)) + + import kfp + client = kfp.Client(host='127.0.0.1:8080/pipeline') + + pkg_path = '/tmp/witest_pkg.tar.gz' + compiler.Compiler().compile(pipeline, package_path=pkg_path) + exp = client.create_experiment('withparams_exp') + client.run_pipeline( + experiment_id=exp.id, + job_name='withitem_basic', + pipeline_package_path=pkg_path, + params={}, + ) diff --git a/sdk/python/tests/compiler/testdata/withitem_basic.yaml b/sdk/python/tests/compiler/testdata/withitem_basic.yaml index 5cd187eaa45..0cbf73eab21 100644 --- a/sdk/python/tests/compiler/testdata/withitem_basic.yaml +++ b/sdk/python/tests/compiler/testdata/withitem_basic.yaml @@ -2,8 +2,8 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: annotations: - pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": - 10, "name": "my_pipe_param"}], "name": "my-pipeline"}' + pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": 10, "name": "my_pipe_param"}], + "name": "my-pipeline"}' generateName: my-pipeline- spec: arguments: diff --git a/sdk/python/tests/compiler/testdata/withitem_nested.py b/sdk/python/tests/compiler/testdata/withitem_nested.py index cf6fd915174..3de8211482b 100644 --- a/sdk/python/tests/compiler/testdata/withitem_nested.py +++ b/sdk/python/tests/compiler/testdata/withitem_nested.py @@ -60,3 +60,21 @@ def pipeline(my_pipe_param=10): command=["sh", "-c"], arguments=["echo %s" % my_pipe_param], ) + + +if __name__ == '__main__': + from kfp import compiler + import kfp + import time + client = kfp.Client(host='127.0.0.1:8080/pipeline') + print(compiler.Compiler().compile(pipeline, package_path=None)) + + pkg_path = '/tmp/witest_pkg.tar.gz' + compiler.Compiler().compile(pipeline, package_path=pkg_path) + exp = client.create_experiment('withparams_exp') + client.run_pipeline( + experiment_id=exp.id, + job_name='withitem_nested_{}'.format(time.time()), + pipeline_package_path=pkg_path, + params={}, + ) diff --git a/sdk/python/tests/compiler/testdata/withitem_nested.yaml b/sdk/python/tests/compiler/testdata/withitem_nested.yaml index 70c2d740298..7a5eb3e85b5 100644 --- a/sdk/python/tests/compiler/testdata/withitem_nested.yaml +++ b/sdk/python/tests/compiler/testdata/withitem_nested.yaml @@ -2,8 +2,8 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: annotations: - pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": - 10, "name": "my_pipe_param"}], "name": "my-pipeline"}' + pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": 10, "name": "my_pipe_param"}], + "name": "my-pipeline"}' generateName: my-pipeline- spec: arguments: diff --git a/sdk/python/tests/compiler/testdata/withparam_global.py b/sdk/python/tests/compiler/testdata/withparam_global.py index 2a63cbe6ea2..ef0921b31bc 100644 --- a/sdk/python/tests/compiler/testdata/withparam_global.py +++ b/sdk/python/tests/compiler/testdata/withparam_global.py @@ -57,4 +57,17 @@ def pipeline(loopidy_doop=[3, 5, 7, 9]): if __name__ == '__main__': from kfp import compiler + import kfp + import time + client = kfp.Client(host='127.0.0.1:8080/pipeline') print(compiler.Compiler().compile(pipeline, package_path=None)) + + pkg_path = '/tmp/witest_pkg.tar.gz' + compiler.Compiler().compile(pipeline, package_path=pkg_path) + exp = client.create_experiment('withparams_exp') + client.run_pipeline( + experiment_id=exp.id, + job_name='withparam_global_{}'.format(time.time()), + pipeline_package_path=pkg_path, + params={}, + ) diff --git a/sdk/python/tests/compiler/testdata/withparam_global.yaml b/sdk/python/tests/compiler/testdata/withparam_global.yaml index da3578e809a..ae9c7df1a0a 100644 --- a/sdk/python/tests/compiler/testdata/withparam_global.yaml +++ b/sdk/python/tests/compiler/testdata/withparam_global.yaml @@ -36,14 +36,6 @@ spec: parameters: - name: loopidy-doop name: my-in-cop1 - outputs: - artifacts: - - name: mlpipeline-ui-metadata - optional: true - path: /mlpipeline-ui-metadata.json - - name: mlpipeline-metrics - optional: true - path: /mlpipeline-metrics.json - container: args: - python -c "import json; import sys; json.dump([i for i in range(20, 31)], @@ -54,13 +46,6 @@ spec: image: python:alpine3.6 name: my-out-cop0 outputs: - artifacts: - - name: mlpipeline-ui-metadata - optional: true - path: /mlpipeline-ui-metadata.json - - name: mlpipeline-metrics - optional: true - path: /mlpipeline-metrics.json parameters: - name: my-out-cop0-out valueFrom: @@ -76,14 +61,6 @@ spec: parameters: - name: my-out-cop0-out name: my-out-cop2 - outputs: - artifacts: - - name: mlpipeline-ui-metadata - optional: true - path: /mlpipeline-ui-metadata.json - - name: mlpipeline-metrics - optional: true - path: /mlpipeline-metrics.json - dag: tasks: - arguments: diff --git a/sdk/python/tests/compiler/testdata/withparam_global_dict.py b/sdk/python/tests/compiler/testdata/withparam_global_dict.py index cb307574894..837998ff2aa 100644 --- a/sdk/python/tests/compiler/testdata/withparam_global_dict.py +++ b/sdk/python/tests/compiler/testdata/withparam_global_dict.py @@ -44,7 +44,7 @@ def pipeline(loopidy_doop=[{'a': 1, 'b': 2}, {'a': 10, 'b': 20}]): name="my-in-cop1", image="library/bash:4.4.23", command=["sh", "-c"], - arguments=["echo no output global op1, item: %s" % item], + arguments=["echo no output global op1, item.a: %s" % item.a], ).after(op0) op_out = dsl.ContainerOp( @@ -57,4 +57,17 @@ def pipeline(loopidy_doop=[{'a': 1, 'b': 2}, {'a': 10, 'b': 20}]): if __name__ == '__main__': from kfp import compiler + import kfp + import time + client = kfp.Client(host='127.0.0.1:8080/pipeline') print(compiler.Compiler().compile(pipeline, package_path=None)) + + pkg_path = '/tmp/witest_pkg.tar.gz' + compiler.Compiler().compile(pipeline, package_path=pkg_path) + exp = client.create_experiment('withparams_exp') + client.run_pipeline( + experiment_id=exp.id, + job_name='withparam_global_dict_{}'.format(time.time()), + pipeline_package_path=pkg_path, + params={}, + ) diff --git a/sdk/python/tests/compiler/testdata/withparam_global_dict.yaml b/sdk/python/tests/compiler/testdata/withparam_global_dict.yaml index d3cdba5caee..a0c29186a22 100644 --- a/sdk/python/tests/compiler/testdata/withparam_global_dict.yaml +++ b/sdk/python/tests/compiler/testdata/withparam_global_dict.yaml @@ -17,33 +17,25 @@ spec: tasks: - arguments: parameters: - - name: loopidy-doop - value: '{{inputs.parameters.loopidy-doop}}' + - name: loopidy-doop-subvar-a + value: '{{inputs.parameters.loopidy-doop-subvar-a}}' name: my-in-cop1 template: my-in-cop1 inputs: parameters: - - name: loopidy-doop + - name: loopidy-doop-subvar-a name: for-loop-for-loop-00000001-1 - container: args: - - 'echo no output global op1, item: {{inputs.parameters.loopidy-doop}}' + - 'echo no output global op1, item.a: {{inputs.parameters.loopidy-doop-subvar-a}}' command: - sh - -c image: library/bash:4.4.23 inputs: parameters: - - name: loopidy-doop + - name: loopidy-doop-subvar-a name: my-in-cop1 - outputs: - artifacts: - - name: mlpipeline-ui-metadata - optional: true - path: /mlpipeline-ui-metadata.json - - name: mlpipeline-metrics - optional: true - path: /mlpipeline-metrics.json - container: args: - python -c "import json; import sys; json.dump([i for i in range(20, 31)], @@ -54,13 +46,6 @@ spec: image: python:alpine3.6 name: my-out-cop0 outputs: - artifacts: - - name: mlpipeline-ui-metadata - optional: true - path: /mlpipeline-ui-metadata.json - - name: mlpipeline-metrics - optional: true - path: /mlpipeline-metrics.json parameters: - name: my-out-cop0-out valueFrom: @@ -76,20 +61,12 @@ spec: parameters: - name: my-out-cop0-out name: my-out-cop2 - outputs: - artifacts: - - name: mlpipeline-ui-metadata - optional: true - path: /mlpipeline-ui-metadata.json - - name: mlpipeline-metrics - optional: true - path: /mlpipeline-metrics.json - dag: tasks: - arguments: parameters: - - name: loopidy-doop - value: '{{item}}' + - name: loopidy-doop-subvar-a + value: '{{item.a}}' dependencies: - my-out-cop0 name: for-loop-for-loop-00000001-1 @@ -105,4 +82,4 @@ spec: - my-out-cop0 name: my-out-cop2 template: my-out-cop2 - name: my-pipeline \ No newline at end of file + name: my-pipeline diff --git a/sdk/python/tests/compiler/testdata/withparam_output.py b/sdk/python/tests/compiler/testdata/withparam_output.py index e77fe9ef079..b4255d0a813 100644 --- a/sdk/python/tests/compiler/testdata/withparam_output.py +++ b/sdk/python/tests/compiler/testdata/withparam_output.py @@ -57,4 +57,17 @@ def pipeline(): if __name__ == '__main__': from kfp import compiler + import kfp + import time + client = kfp.Client(host='127.0.0.1:8080/pipeline') print(compiler.Compiler().compile(pipeline, package_path=None)) + + pkg_path = '/tmp/witest_pkg.tar.gz' + compiler.Compiler().compile(pipeline, package_path=pkg_path) + exp = client.create_experiment('withparams_exp') + client.run_pipeline( + experiment_id=exp.id, + job_name='withparam_output_{}'.format(time.time()), + pipeline_package_path=pkg_path, + params={}, + ) diff --git a/sdk/python/tests/compiler/testdata/withparam_output.yaml b/sdk/python/tests/compiler/testdata/withparam_output.yaml index c76a6f58543..d9150aad37b 100644 --- a/sdk/python/tests/compiler/testdata/withparam_output.yaml +++ b/sdk/python/tests/compiler/testdata/withparam_output.yaml @@ -33,14 +33,6 @@ spec: parameters: - name: my-out-cop0-out name: my-in-cop1 - outputs: - artifacts: - - name: mlpipeline-ui-metadata - optional: true - path: /mlpipeline-ui-metadata.json - - name: mlpipeline-metrics - optional: true - path: /mlpipeline-metrics.json - container: args: - python -c "import json; import sys; json.dump([i for i in range(20, 31)], @@ -51,13 +43,6 @@ spec: image: python:alpine3.6 name: my-out-cop0 outputs: - artifacts: - - name: mlpipeline-ui-metadata - optional: true - path: /mlpipeline-ui-metadata.json - - name: mlpipeline-metrics - optional: true - path: /mlpipeline-metrics.json parameters: - name: my-out-cop0-out valueFrom: @@ -73,14 +58,6 @@ spec: parameters: - name: my-out-cop0-out name: my-out-cop2 - outputs: - artifacts: - - name: mlpipeline-ui-metadata - optional: true - path: /mlpipeline-ui-metadata.json - - name: mlpipeline-metrics - optional: true - path: /mlpipeline-metrics.json - dag: tasks: - arguments: diff --git a/sdk/python/tests/compiler/testdata/withparam_output_dict.py b/sdk/python/tests/compiler/testdata/withparam_output_dict.py index 1cd0aa5dd61..12b1a6d9db5 100644 --- a/sdk/python/tests/compiler/testdata/withparam_output_dict.py +++ b/sdk/python/tests/compiler/testdata/withparam_output_dict.py @@ -43,7 +43,7 @@ def pipeline(): name="my-in-cop1", image="library/bash:4.4.23", command=["sh", "-c"], - arguments=["echo do output op1 item: %s" % item], + arguments=["echo do output op1 item.a: %s" % item.a], ) op_out = dsl.ContainerOp( @@ -56,4 +56,17 @@ def pipeline(): if __name__ == '__main__': from kfp import compiler + import kfp + import time + client = kfp.Client(host='127.0.0.1:8080/pipeline') print(compiler.Compiler().compile(pipeline, package_path=None)) + + pkg_path = '/tmp/witest_pkg.tar.gz' + compiler.Compiler().compile(pipeline, package_path=pkg_path) + exp = client.create_experiment('withparams_exp') + client.run_pipeline( + experiment_id=exp.id, + job_name='withparam_output_dict_{}'.format(time.time()), + pipeline_package_path=pkg_path, + params={}, + ) diff --git a/sdk/python/tests/compiler/testdata/withparam_output_dict.yaml b/sdk/python/tests/compiler/testdata/withparam_output_dict.yaml index ff04daf4023..9859e323e1f 100644 --- a/sdk/python/tests/compiler/testdata/withparam_output_dict.yaml +++ b/sdk/python/tests/compiler/testdata/withparam_output_dict.yaml @@ -33,14 +33,6 @@ spec: parameters: - name: out-subvar-a name: my-in-cop1 - outputs: - artifacts: - - name: mlpipeline-ui-metadata - optional: true - path: /mlpipeline-ui-metadata.json - - name: mlpipeline-metrics - optional: true - path: /mlpipeline-metrics.json - container: args: - 'python -c "import json; import sys; json.dump([{''a'': 1, ''b'': 2}, {''a'': @@ -51,13 +43,6 @@ spec: image: python:alpine3.6 name: my-out-cop0 outputs: - artifacts: - - name: mlpipeline-ui-metadata - optional: true - path: /mlpipeline-ui-metadata.json - - name: mlpipeline-metrics - optional: true - path: /mlpipeline-metrics.json parameters: - name: my-out-cop0-out valueFrom: @@ -73,14 +58,6 @@ spec: parameters: - name: my-out-cop0-out name: my-out-cop2 - outputs: - artifacts: - - name: mlpipeline-ui-metadata - optional: true - path: /mlpipeline-ui-metadata.json - - name: mlpipeline-metrics - optional: true - path: /mlpipeline-metrics.json - dag: tasks: - arguments: From 29a2eb086ef655261b3d5a4fe8cf49f88633295f Mon Sep 17 00:00:00 2001 From: Kevin Bache Date: Fri, 13 Sep 2019 18:08:30 -0700 Subject: [PATCH 14/15] sort keys --- sdk/python/kfp/compiler/compiler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 950cbc584f5..b1ea0559ebc 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -627,7 +627,7 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None): param = {'name': arg.name} if arg.value is not None: if isinstance(arg.value, (list, tuple)): - param['value'] = json.dumps(arg.value) + param['value'] = json.dumps(arg.value, sort_keys=True) else: param['value'] = str(arg.value) input_params.append(param) From 1073511dbbe70dfb523ff23c92722d39a66ffda2 Mon Sep 17 00:00:00 2001 From: Kevin Bache Date: Mon, 16 Sep 2019 12:04:06 -0700 Subject: [PATCH 15/15] removed dead artifact code --- sdk/python/kfp/compiler/compiler.py | 11 ----------- sdk/python/tests/compiler/compiler_tests.py | 2 +- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index b1ea0559ebc..373d55be75c 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -487,17 +487,6 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies): else: task['withItems'] = sub_group.loop_args.to_list_for_task_yaml() - if isinstance(sub_group, dsl.ContainerOp) and sub_group.artifact_arguments: - artifact_argument_structs = [] - for input_name, argument in sub_group.artifact_arguments.items(): - artifact_argument_dict = {'name': input_name} - if isinstance(argument, str): - artifact_argument_dict['raw'] = {'data': str(argument)} - else: - raise TypeError('Argument "{}" was passed to the artifact input "{}", but only constant strings are supported at this moment.'.format(str(argument), input_name)) - artifact_argument_structs.append(artifact_argument_dict) - task.setdefault('arguments', {})['artifacts'] = artifact_argument_structs - tasks.append(task) tasks.sort(key=lambda x: x['name']) template['dag'] = {'tasks': tasks} diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index aa977de7709..2fdff0b92b4 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -728,4 +728,4 @@ def test_withparam_output_dict(self): def test_py_input_artifact_raw_value(self): """Test pipeline input_artifact_raw_value.""" self._test_py_compile_yaml('input_artifact_raw_value') - \ No newline at end of file +