diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index 827e355cda9..aa6668e9683 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -4,6 +4,7 @@ * Support local execution of sequential pipelines [\#10423](https://github.com/kubeflow/pipelines/pull/10423) * Support local execution of `dsl.importer` components [\#10431](https://github.com/kubeflow/pipelines/pull/10431) * Support local execution of pipelines in pipelines [\#10440](https://github.com/kubeflow/pipelines/pull/10440) +* Support dsl.ParallelFor over list of Artifacts [\#10441](https://github.com/kubeflow/pipelines/pull/10441) ## Breaking changes diff --git a/sdk/python/kfp/compiler/compiler_utils.py b/sdk/python/kfp/compiler/compiler_utils.py index 3a54cb041c3..f173c11d0c7 100644 --- a/sdk/python/kfp/compiler/compiler_utils.py +++ b/sdk/python/kfp/compiler/compiler_utils.py @@ -247,7 +247,8 @@ def get_inputs_for_all_groups( channel_to_add = channel while isinstance(channel_to_add, ( - for_loop.LoopArgument, + for_loop.LoopParameterArgument, + for_loop.LoopArtifactArgument, for_loop.LoopArgumentVariable, )): channels_to_add.append(channel_to_add) @@ -309,10 +310,11 @@ def get_inputs_for_all_groups( # loop items, we have to go from bottom-up because the # PipelineChannel can be originated from the middle a DAG, # which is not needed and visible to its parent DAG. - if isinstance( - channel, - (for_loop.LoopArgument, for_loop.LoopArgumentVariable - )) and channel.is_with_items_loop_argument: + if isinstance(channel, ( + for_loop.LoopParameterArgument, + for_loop.LoopArtifactArgument, + for_loop.LoopArgumentVariable, + )) and channel.is_with_items_loop_argument: for group_name in task_name_to_parent_groups[ task.name][::-1]: diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index a7e5546c102..86e446673e2 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -23,6 +23,7 @@ from google.protobuf import json_format from google.protobuf import struct_pb2 import kfp +from kfp import dsl from kfp.compiler import compiler_utils from kfp.dsl import component_factory from kfp.dsl import for_loop @@ -128,8 +129,10 @@ def build_task_spec_for_task( task._task_spec.retry_policy.to_proto()) for input_name, input_value in task.inputs.items(): - # since LoopArgument and LoopArgumentVariable are narrower types than PipelineParameterChannel, start with it - if isinstance(input_value, for_loop.LoopArgument): + # Since LoopParameterArgument and LoopArtifactArgument and LoopArgumentVariable are narrower + # types than PipelineParameterChannel, start with them. + + if isinstance(input_value, for_loop.LoopParameterArgument): component_input_parameter = ( compiler_utils.additional_input_name_for_pipeline_channel( @@ -140,6 +143,17 @@ def build_task_spec_for_task( input_name].component_input_parameter = ( component_input_parameter) + elif isinstance(input_value, for_loop.LoopArtifactArgument): + + component_input_artifact = ( + compiler_utils.additional_input_name_for_pipeline_channel( + input_value)) + assert component_input_artifact in parent_component_inputs.artifacts, \ + f'component_input_artifact: {component_input_artifact} not found. All inputs: {parent_component_inputs}' + pipeline_task_spec.inputs.artifacts[ + input_name].component_input_artifact = ( + component_input_artifact) + elif isinstance(input_value, for_loop.LoopArgumentVariable): component_input_parameter = ( @@ -155,7 +169,7 @@ def build_task_spec_for_task( f'parseJson(string_value)["{input_value.subvar_name}"]') elif isinstance(input_value, pipeline_channel.PipelineArtifactChannel) or ( - isinstance(input_value, for_loop.Collected) and + isinstance(input_value, dsl.Collected) and input_value.is_artifact_channel): if input_value.task_name: @@ -190,7 +204,7 @@ def build_task_spec_for_task( elif isinstance(input_value, pipeline_channel.PipelineParameterChannel) or ( - isinstance(input_value, for_loop.Collected) and + isinstance(input_value, dsl.Collected) and not input_value.is_artifact_channel): if input_value.task_name: @@ -683,19 +697,25 @@ def build_component_spec_for_group( input_name = compiler_utils.additional_input_name_for_pipeline_channel( channel) - if isinstance(channel, pipeline_channel.PipelineArtifactChannel): + if isinstance(channel, (pipeline_channel.PipelineArtifactChannel, + for_loop.LoopArtifactArgument)): component_spec.input_definitions.artifacts[ input_name].artifact_type.CopyFrom( type_utils.bundled_artifact_to_artifact_proto( channel.channel_type)) component_spec.input_definitions.artifacts[ input_name].is_artifact_list = channel.is_artifact_list - else: - # channel is one of PipelineParameterChannel, LoopArgument, or - # LoopArgumentVariable. + elif isinstance(channel, + (pipeline_channel.PipelineParameterChannel, + for_loop.LoopParameterArgument, + for_loop.LoopArgumentVariable, dsl.Collected)): component_spec.input_definitions.parameters[ input_name].parameter_type = type_utils.get_parameter_type( channel.channel_type) + else: + raise TypeError( + f'Expected PipelineParameterChannel, PipelineArtifactChannel, LoopParameterArgument, LoopArtifactArgument, LoopArgumentVariable, or Collected, got {type(channel)}.' + ) for output_name, output in output_pipeline_channels.items(): if isinstance(output, pipeline_channel.PipelineArtifactChannel): @@ -747,13 +767,34 @@ def _update_task_spec_for_loop_group( loop_argument_item_name = compiler_utils.additional_input_name_for_pipeline_channel( group.loop_argument.full_name) - loop_arguments_item = f'{input_parameter_name}-{for_loop.LoopArgument.LOOP_ITEM_NAME_BASE}' + loop_arguments_item = f'{input_parameter_name}-{for_loop.LOOP_ITEM_NAME_BASE}' assert loop_arguments_item == loop_argument_item_name - pipeline_task_spec.parameter_iterator.items.input_parameter = ( - input_parameter_name) - pipeline_task_spec.parameter_iterator.item_input = ( - loop_argument_item_name) + if isinstance(group.loop_argument, for_loop.LoopParameterArgument): + pipeline_task_spec.parameter_iterator.items.input_parameter = ( + input_parameter_name) + pipeline_task_spec.parameter_iterator.item_input = ( + loop_argument_item_name) + + _pop_input_from_task_spec( + task_spec=pipeline_task_spec, + input_name=pipeline_task_spec.parameter_iterator.item_input) + + elif isinstance(group.loop_argument, for_loop.LoopArtifactArgument): + input_artifact_name = compiler_utils.additional_input_name_for_pipeline_channel( + loop_items_channel) + + pipeline_task_spec.artifact_iterator.items.input_artifact = input_artifact_name + pipeline_task_spec.artifact_iterator.item_input = ( + loop_argument_item_name) + + _pop_input_from_task_spec( + task_spec=pipeline_task_spec, + input_name=pipeline_task_spec.artifact_iterator.item_input) + else: + raise TypeError( + f'Expected LoopParameterArgument or LoopArtifactArgument, got {type(group.loop_argument)}.' + ) # If the loop items itself is a loop arguments variable, handle the # subvar name. @@ -777,14 +818,14 @@ def _update_task_spec_for_loop_group( pipeline_task_spec.parameter_iterator.item_input = ( input_parameter_name) + _pop_input_from_task_spec( + task_spec=pipeline_task_spec, + input_name=pipeline_task_spec.parameter_iterator.item_input) + if (group.parallelism_limit > 0): pipeline_task_spec.iterator_policy.parallelism_limit = ( group.parallelism_limit) - _pop_input_from_task_spec( - task_spec=pipeline_task_spec, - input_name=pipeline_task_spec.parameter_iterator.item_input) - def _binary_operations_to_cel_conjunctive( operations: List[pipeline_channel.ConditionOperation]) -> str: @@ -1290,10 +1331,11 @@ def build_spec_by_group( for channel in subgroup_input_channels: # Skip 'withItems' loop arguments if it's from an inner loop. - if isinstance( - channel, - (for_loop.LoopArgument, for_loop.LoopArgumentVariable - )) and channel.is_with_items_loop_argument: + if isinstance(channel, ( + for_loop.LoopParameterArgument, + for_loop.LoopArtifactArgument, + for_loop.LoopArgumentVariable, + )) and channel.is_with_items_loop_argument: withitems_loop_arg_found_in_self_or_upstream = False for group_name in group_name_to_parent_groups[ subgroup.name][::-1]: @@ -1782,7 +1824,7 @@ def _rename_component_refs( def validate_pipeline_outputs_dict( pipeline_outputs_dict: Dict[str, pipeline_channel.PipelineChannel]): for channel in pipeline_outputs_dict.values(): - if isinstance(channel, for_loop.Collected): + if isinstance(channel, dsl.Collected): # this validation doesn't apply to Collected continue diff --git a/sdk/python/kfp/dsl/for_loop.py b/sdk/python/kfp/dsl/for_loop.py index 14e0a427736..ea2894420ef 100644 --- a/sdk/python/kfp/dsl/for_loop.py +++ b/sdk/python/kfp/dsl/for_loop.py @@ -20,6 +20,9 @@ ItemList = List[Union[int, float, str, Dict[str, Any]]] +LOOP_ITEM_NAME_BASE = 'loop-item' +LOOP_ITEM_PARAM_NAME_BASE = 'loop-item-param' + def _get_loop_item_type(type_name: str) -> Optional[str]: """Extracts the loop item type. @@ -64,24 +67,28 @@ def _get_subvar_type(type_name: str) -> Optional[str]: return match['value_type'].lstrip().rstrip() if match else None -class LoopArgument(pipeline_channel.PipelineParameterChannel): - """Represents the argument that are looped over in a ParallelFor loop. +def _make_name(code: str) -> str: + """Makes a name for a loop argument from a unique code.""" + return f'{LOOP_ITEM_PARAM_NAME_BASE}-{code}' + + +class LoopParameterArgument(pipeline_channel.PipelineParameterChannel): + """Represents the parameter arguments that are looped over in a ParallelFor + loop. The class shouldn't be instantiated by the end user, rather it is created automatically by a ParallelFor ops group. - To create a LoopArgument instance, use one of its factory methods:: + To create a LoopParameterArgument instance, use one of its factory methods:: - LoopArgument.from_pipeline_channel(...) - LoopArgument.from_raw_items(...) + LoopParameterArgument.from_pipeline_channel(...) + LoopParameterArgument.from_raw_items(...) Attributes: items_or_pipeline_channel: The raw items or the PipelineParameterChannel object - this LoopArgument is associated to. + this LoopParameterArgument is associated to. """ - LOOP_ITEM_NAME_BASE = 'loop-item' - LOOP_ITEM_PARAM_NAME_BASE = 'loop-item-param' def __init__( self, @@ -90,7 +97,7 @@ def __init__( name_override: Optional[str] = None, **kwargs, ): - """Initializes a LoopArguments object. + """Initializes a LoopParameterArgument object. Args: items: List of items to loop over. If a list of dicts then, all @@ -108,7 +115,7 @@ def __init__( 'be specified.') if name_override is None: - super().__init__(name=self._make_name(name_code), **kwargs) + super().__init__(name=_make_name(name_code), **kwargs) else: super().__init__(name=name_override, **kwargs) @@ -149,20 +156,16 @@ def __getattr__(self, name: str): subvar_name=name, )) - def _make_name(self, code: str): - """Makes a name for this loop argument from a unique code.""" - return f'{self.LOOP_ITEM_PARAM_NAME_BASE}-{code}' - @classmethod def from_pipeline_channel( cls, channel: pipeline_channel.PipelineParameterChannel, - ) -> 'LoopArgument': - """Creates a LoopArgument object from a PipelineParameterChannel - object.""" - return LoopArgument( + ) -> 'LoopParameterArgument': + """Creates a LoopParameterArgument object from a + PipelineParameterChannel object.""" + return LoopParameterArgument( items=channel, - name_override=channel.name + '-' + cls.LOOP_ITEM_NAME_BASE, + name_override=channel.name + '-' + LOOP_ITEM_NAME_BASE, task_name=channel.task_name, channel_type=_get_loop_item_type(channel.channel_type) or 'String', ) @@ -172,26 +175,89 @@ def from_raw_items( cls, raw_items: ItemList, name_code: str, - ) -> 'LoopArgument': - """Creates a LoopArgument object from raw item list.""" + ) -> 'LoopParameterArgument': + """Creates a LoopParameterArgument object from raw item list.""" if len(raw_items) == 0: raise ValueError('Got an empty item list for loop argument.') - return LoopArgument( + return LoopParameterArgument( items=raw_items, name_code=name_code, channel_type=type(raw_items[0]).__name__, ) - @classmethod - def name_is_loop_argument(cls, name: str) -> bool: - """Returns True if the given channel name looks like a loop argument. - Either it came from a withItems loop item or withParams loop - item. +class LoopArtifactArgument(pipeline_channel.PipelineArtifactChannel): + """Represents the artifact arguments that are looped over in a ParallelFor + loop. + + The class shouldn't be instantiated by the end user, rather it is + created automatically by a ParallelFor ops group. + + To create a LoopArtifactArgument instance, use the factory method:: + + LoopArtifactArgument.from_pipeline_channel(...) + + + Attributes: + pipeline_channel: The PipelineArtifactChannel object this + LoopArtifactArgument is associated to. + """ + + def __init__( + self, + items: pipeline_channel.PipelineArtifactChannel, + name_code: Optional[str] = None, + name_override: Optional[str] = None, + **kwargs, + ): + """Initializes a LoopArtifactArgument object. + + Args: + items: The PipelineArtifactChannel object this LoopArtifactArgument is + associated to. + name_code: A unique code used to identify these loop arguments. + Should match the code for the ParallelFor ops_group which created + these LoopArtifactArguments. This prevents parameter name collisions. + name_override: The override name for PipelineArtifactChannel. + **kwargs: Any other keyword arguments passed down to PipelineArtifactChannel. """ - return ('-' + cls.LOOP_ITEM_NAME_BASE) in name \ - or (cls.LOOP_ITEM_PARAM_NAME_BASE + '-') in name + if (name_code is None) == (name_override is None): + raise ValueError( + 'Expect one and only one of `name_code` and `name_override` to ' + 'be specified.') + + # We don't support nested lists so `is_artifact_list` is always False. + if name_override is None: + super().__init__( + name=_make_name(name_code), is_artifact_list=False, **kwargs) + else: + super().__init__( + name=name_override, is_artifact_list=False, **kwargs) + + self.items_or_pipeline_channel = items + self.is_with_items_loop_argument = not isinstance( + items, pipeline_channel.PipelineArtifactChannel) + + @classmethod + def from_pipeline_channel( + cls, + channel: pipeline_channel.PipelineArtifactChannel, + ) -> 'LoopArtifactArgument': + """Creates a LoopArtifactArgument object from a PipelineArtifactChannel + object.""" + if not channel.is_artifact_list: + raise ValueError( + 'Cannot iterate over a single Artifact using `dsl.ParallelFor`. Expected a list of Artifacts as argument to `items`.' + ) + return LoopArtifactArgument( + items=channel, + name_override=channel.name + '-' + LOOP_ITEM_NAME_BASE, + task_name=channel.task_name, + channel_type=channel.channel_type, + ) + + # TODO: support artifact constants here. class LoopArgumentVariable(pipeline_channel.PipelineParameterChannel): @@ -215,13 +281,13 @@ class LoopArgumentVariable(pipeline_channel.PipelineParameterChannel): def __init__( self, - loop_argument: LoopArgument, + loop_argument: LoopParameterArgument, subvar_name: str, ): """Initializes a LoopArgumentVariable instance. Args: - loop_argument: The LoopArgument object this subvariable is based on + loop_argument: The LoopParameterArgument object this subvariable is based on a subvariable to. subvar_name: The name of this subvariable, which is the name of the dict key that spawned this subvariable. @@ -251,7 +317,7 @@ def __init__( def items_or_pipeline_channel( self) -> Union[ItemList, pipeline_channel.PipelineParameterChannel]: """Returns the loop argument items.""" - return self.loop_argument.items_or_pipeline_chanenl + return self.loop_argument.items_or_pipeline_channel @property def is_with_items_loop_argument(self) -> bool: @@ -301,16 +367,17 @@ def __init__( output: pipeline_channel.PipelineChannel, ) -> None: self.output = output + # we know all dsl.Collected instances are lists, so set `is_artifact_list` + # for type checking, which occurs before dsl.Collected is updated to + # it's "correct" channel during compilation if isinstance(output, pipeline_channel.PipelineArtifactChannel): channel_type = output.channel_type self.is_artifact_channel = True - # we know all dsl.Collected instances are lists, so set to true - # for type checking, which occurs before dsl.Collected is updated to - # it's "correct" channel during compilation self.is_artifact_list = True else: channel_type = 'LIST' self.is_artifact_channel = False + self.is_artifact_list = False super().__init__( output.name, diff --git a/sdk/python/kfp/dsl/for_loop_test.py b/sdk/python/kfp/dsl/for_loop_test.py index 7d1559c87bb..a37c5e5541a 100644 --- a/sdk/python/kfp/dsl/for_loop_test.py +++ b/sdk/python/kfp/dsl/for_loop_test.py @@ -19,6 +19,15 @@ from kfp.dsl import pipeline_channel +def name_is_loop_argument(name: str) -> bool: + """Returns True if the given channel name looks like a loop argument. + + Either it came from a withItems loop item or withParams loop item. + """ + return ('-' + for_loop.LOOP_ITEM_NAME_BASE) in name \ + or (for_loop.LOOP_ITEM_PARAM_NAME_BASE + '-') in name + + class ForLoopTest(parameterized.TestCase): @parameterized.parameters( @@ -89,12 +98,63 @@ def test_get_subvar_type(self, dict_type, value_type): '{{channel:task=task1;name=output1-loop-item;type=Dict[str, str];}}', }, ) - def test_loop_argument_from_pipeline_channel(self, channel, - expected_serialization_value): - loop_argument = for_loop.LoopArgument.from_pipeline_channel(channel) + def test_loop_parameter_argument_from_pipeline_channel( + self, channel, expected_serialization_value): + loop_argument = for_loop.LoopParameterArgument.from_pipeline_channel( + channel) self.assertEqual(loop_argument.items_or_pipeline_channel, channel) self.assertEqual(str(loop_argument), expected_serialization_value) + @parameterized.parameters( + { + 'channel': + pipeline_channel.PipelineArtifactChannel( + name='param1', + channel_type='system.Artifact@0.0.1', + task_name='task1', + is_artifact_list=True, + ), + 'expected_serialization_value': + '{{channel:task=task1;name=param1-loop-item;type=system.Artifact@0.0.1;}}', + }, + { + 'channel': + pipeline_channel.PipelineArtifactChannel( + name='output1', + channel_type='system.Dataset@0.0.1', + task_name='task1', + is_artifact_list=True, + ), + 'expected_serialization_value': + '{{channel:task=task1;name=output1-loop-item;type=system.Dataset@0.0.1;}}', + }, + ) + def test_loop_artifact_argument_from_pipeline_channel( + self, channel, expected_serialization_value): + loop_argument = for_loop.LoopArtifactArgument.from_pipeline_channel( + channel) + self.assertEqual(loop_argument.items_or_pipeline_channel, channel), + self.assertEqual(str(loop_argument), expected_serialization_value) + + @parameterized.parameters( + { + 'channel': + pipeline_channel.PipelineArtifactChannel( + name='param1', + channel_type='system.Artifact@0.0.1', + task_name='task1', + is_artifact_list=False, + ), + },) + def test_loop_artifact_argument_from_single_pipeline_channel_raises_error( + self, channel): + with self.assertRaisesRegex( + ValueError, + r'Cannot iterate over a single Artifact using `dsl\.ParallelFor`\. Expected a list of Artifacts as argument to `items`\.' + ): + loop_argument = for_loop.LoopArtifactArgument.from_pipeline_channel( + channel) + @parameterized.parameters( { 'raw_items': ['a', 'b', 'c'], @@ -120,7 +180,7 @@ def test_loop_argument_from_pipeline_channel(self, channel, ) def test_loop_argument_from_raw_items(self, raw_items, name_code, expected_serialization_value): - loop_argument = for_loop.LoopArgument.from_raw_items( + loop_argument = for_loop.LoopParameterArgument.from_raw_items( raw_items, name_code) self.assertEqual(loop_argument.items_or_pipeline_channel, raw_items) self.assertEqual(str(loop_argument), expected_serialization_value) @@ -148,8 +208,7 @@ def test_loop_argument_from_raw_items(self, raw_items, name_code, }, ) def test_name_is_loop_argument(self, name, expected_result): - self.assertEqual( - for_loop.LoopArgument.name_is_loop_argument(name), expected_result) + self.assertEqual(name_is_loop_argument(name), expected_result) @parameterized.parameters( { @@ -178,7 +237,7 @@ def test_name_is_loop_argument(self, name, expected_result): }, ) def test_create_loop_argument_varaible(self, subvar_name, valid): - loop_argument = for_loop.LoopArgument.from_pipeline_channel( + loop_argument = for_loop.LoopParameterArgument.from_pipeline_channel( pipeline_channel.PipelineParameterChannel( name='param1', channel_type='List[Dict[str, str]]', diff --git a/sdk/python/kfp/dsl/pipeline_channel.py b/sdk/python/kfp/dsl/pipeline_channel.py index 6adb52525cd..4731030709f 100644 --- a/sdk/python/kfp/dsl/pipeline_channel.py +++ b/sdk/python/kfp/dsl/pipeline_channel.py @@ -267,6 +267,7 @@ def __init__( channel_type: The type of the pipeline channel. task_name: Optional; the name of the task that produces the pipeline channel. + is_artifact_list: True if `channel_type` represents a list of the artifact type. Raises: ValueError: If name or task_name contains invalid characters. diff --git a/sdk/python/kfp/dsl/tasks_group.py b/sdk/python/kfp/dsl/tasks_group.py index 3f0f758bbd3..c19fed788dc 100644 --- a/sdk/python/kfp/dsl/tasks_group.py +++ b/sdk/python/kfp/dsl/tasks_group.py @@ -454,20 +454,27 @@ def __init__( is_root=False, ) - if isinstance(items, pipeline_channel.PipelineChannel): - self.loop_argument = for_loop.LoopArgument.from_pipeline_channel( + if isinstance(items, pipeline_channel.PipelineParameterChannel): + self.loop_argument = for_loop.LoopParameterArgument.from_pipeline_channel( + items) + self.items_is_pipeline_channel = True + elif isinstance(items, pipeline_channel.PipelineArtifactChannel): + self.loop_argument = for_loop.LoopArtifactArgument.from_pipeline_channel( items) self.items_is_pipeline_channel = True else: - self.loop_argument = for_loop.LoopArgument.from_raw_items( + self.loop_argument = for_loop.LoopParameterArgument.from_raw_items( raw_items=items, name_code=pipeline_context.Pipeline.get_default_pipeline() .get_next_group_id(), ) self.items_is_pipeline_channel = False + # TODO: support artifact constants here. self.parallelism_limit = parallelism - def __enter__(self) -> for_loop.LoopArgument: + def __enter__( + self + ) -> Union[for_loop.LoopParameterArgument, for_loop.LoopArtifactArgument]: super().__enter__() return self.loop_argument diff --git a/sdk/python/kfp/dsl/tasks_group_test.py b/sdk/python/kfp/dsl/tasks_group_test.py index 40c68ab3725..92bcf505887 100644 --- a/sdk/python/kfp/dsl/tasks_group_test.py +++ b/sdk/python/kfp/dsl/tasks_group_test.py @@ -26,7 +26,7 @@ def test_basic(self): loop_items = ['pizza', 'hotdog', 'pasta'] with pipeline_context.Pipeline('pipeline') as p: with tasks_group.ParallelFor(items=loop_items) as parallel_for: - loop_argument = for_loop.LoopArgument.from_raw_items( + loop_argument = for_loop.LoopParameterArgument.from_raw_items( loop_items, '1') self.assertEqual(parallel_for.group_type, 'for-loop') self.assertEqual(parallel_for.parallelism, 0) @@ -37,7 +37,7 @@ def test_parallelfor_valid_parallelism(self): with pipeline_context.Pipeline('pipeline') as p: with tasks_group.ParallelFor( items=loop_items, parallelism=3) as parallel_for: - loop_argument = for_loop.LoopArgument.from_raw_items( + loop_argument = for_loop.LoopParameterArgument.from_raw_items( loop_items, '1') self.assertEqual(parallel_for.group_type, 'for-loop') self.assertEqual(parallel_for.parallelism, 3) @@ -48,7 +48,7 @@ def test_parallelfor_zero_parallelism(self): with pipeline_context.Pipeline('pipeline') as p: with tasks_group.ParallelFor( items=loop_items, parallelism=0) as parallel_for: - loop_argument = for_loop.LoopArgument.from_raw_items( + loop_argument = for_loop.LoopParameterArgument.from_raw_items( loop_items, '1') self.assertEqual(parallel_for.group_type, 'for-loop') self.assertEqual(parallel_for.parallelism, 0) diff --git a/sdk/python/test_data/pipelines/pipeline_with_parallelfor_list_artifacts.py b/sdk/python/test_data/pipelines/pipeline_with_parallelfor_list_artifacts.py new file mode 100644 index 00000000000..64ab75d31d5 --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_parallelfor_list_artifacts.py @@ -0,0 +1,77 @@ +# Copyright 2024 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List + +from kfp import compiler +from kfp import dsl +from kfp.dsl import Artifact +from kfp.dsl import Dataset + + +@dsl.component +def print_artifact_name(artifact: Artifact) -> str: + print(artifact.name) + return artifact.name + + +@dsl.component +def make_dataset(data: str) -> Dataset: + dataset = Dataset(uri=dsl.get_uri(), metadata={'length': len(data)}) + with open(dataset.path, 'w') as f: + f.write(data) + return dataset + + +@dsl.pipeline +def make_datasets( + texts: List[str] = ['Hello', ',', ' ', 'world!']) -> List[Dataset]: + with dsl.ParallelFor(texts) as text: + t1 = make_dataset(data=text) + + return dsl.Collected(t1.output) + + +@dsl.component +def make_artifact(data: str) -> Artifact: + artifact = Artifact(uri=dsl.get_uri(), metadata={'length': len(data)}) + with open(artifact.path, 'w') as f: + f.write(data) + return artifact + + +@dsl.pipeline +def make_artifacts( + texts: List[str] = ['Hello', ',', ' ', 'world!']) -> List[Artifact]: + with dsl.ParallelFor(texts) as text: + t1 = make_artifact(data=text) + + return dsl.Collected(t1.output) + + +@dsl.pipeline(name='pipeline-parallelfor-artifacts') +def my_pipeline(): + make_artifacts_task = make_artifacts() + with dsl.ParallelFor(items=make_artifacts_task.output) as item: + print_artifact_name(artifact=item) + + make_datasets_task = make_datasets() + with dsl.ParallelFor(items=make_datasets_task.output) as item: + print_artifact_name(artifact=item) + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=my_pipeline, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/pipeline_with_parallelfor_list_artifacts.yaml b/sdk/python/test_data/pipelines/pipeline_with_parallelfor_list_artifacts.yaml new file mode 100644 index 00000000000..93a4efd7160 --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_parallelfor_list_artifacts.yaml @@ -0,0 +1,420 @@ +# PIPELINE DEFINITION +# Name: pipeline-parallelfor-artifacts +components: + comp-for-loop-1: + dag: + tasks: + print-artifact-name: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-artifact-name + inputs: + artifacts: + artifact: + componentInputArtifact: pipelinechannel--make-artifacts-Output-loop-item + taskInfo: + name: print-artifact-name + inputDefinitions: + artifacts: + pipelinechannel--make-artifacts-Output: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + isArtifactList: true + pipelinechannel--make-artifacts-Output-loop-item: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + comp-for-loop-1-2: + dag: + outputs: + artifacts: + pipelinechannel--make-artifact-Output: + artifactSelectors: + - outputArtifactKey: Output + producerSubtask: make-artifact + tasks: + make-artifact: + cachingOptions: + enableCache: true + componentRef: + name: comp-make-artifact + inputs: + parameters: + data: + componentInputParameter: pipelinechannel--texts-loop-item + taskInfo: + name: make-artifact + inputDefinitions: + parameters: + pipelinechannel--texts: + parameterType: LIST + pipelinechannel--texts-loop-item: + parameterType: STRING + outputDefinitions: + artifacts: + pipelinechannel--make-artifact-Output: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + isArtifactList: true + comp-for-loop-1-3: + dag: + outputs: + artifacts: + pipelinechannel--make-dataset-Output: + artifactSelectors: + - outputArtifactKey: Output + producerSubtask: make-dataset + tasks: + make-dataset: + cachingOptions: + enableCache: true + componentRef: + name: comp-make-dataset + inputs: + parameters: + data: + componentInputParameter: pipelinechannel--texts-loop-item + taskInfo: + name: make-dataset + inputDefinitions: + parameters: + pipelinechannel--texts: + parameterType: LIST + pipelinechannel--texts-loop-item: + parameterType: STRING + outputDefinitions: + artifacts: + pipelinechannel--make-dataset-Output: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + isArtifactList: true + comp-for-loop-2: + dag: + tasks: + print-artifact-name-2: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-artifact-name-2 + inputs: + artifacts: + artifact: + componentInputArtifact: pipelinechannel--make-datasets-Output-loop-item + taskInfo: + name: print-artifact-name-2 + inputDefinitions: + artifacts: + pipelinechannel--make-datasets-Output: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + isArtifactList: true + pipelinechannel--make-datasets-Output-loop-item: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-make-artifact: + executorLabel: exec-make-artifact + inputDefinitions: + parameters: + data: + parameterType: STRING + outputDefinitions: + artifacts: + Output: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + comp-make-artifacts: + dag: + outputs: + artifacts: + Output: + artifactSelectors: + - outputArtifactKey: pipelinechannel--make-artifact-Output + producerSubtask: for-loop-1 + tasks: + for-loop-1: + componentRef: + name: comp-for-loop-1-2 + inputs: + parameters: + pipelinechannel--texts: + componentInputParameter: texts + parameterIterator: + itemInput: pipelinechannel--texts-loop-item + items: + inputParameter: pipelinechannel--texts + taskInfo: + name: for-loop-1 + inputDefinitions: + parameters: + texts: + defaultValue: + - Hello + - ',' + - ' ' + - world! + isOptional: true + parameterType: LIST + outputDefinitions: + artifacts: + Output: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + isArtifactList: true + comp-make-dataset: + executorLabel: exec-make-dataset + inputDefinitions: + parameters: + data: + parameterType: STRING + outputDefinitions: + artifacts: + Output: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-make-datasets: + dag: + outputs: + artifacts: + Output: + artifactSelectors: + - outputArtifactKey: pipelinechannel--make-dataset-Output + producerSubtask: for-loop-1 + tasks: + for-loop-1: + componentRef: + name: comp-for-loop-1-3 + inputs: + parameters: + pipelinechannel--texts: + componentInputParameter: texts + parameterIterator: + itemInput: pipelinechannel--texts-loop-item + items: + inputParameter: pipelinechannel--texts + taskInfo: + name: for-loop-1 + inputDefinitions: + parameters: + texts: + defaultValue: + - Hello + - ',' + - ' ' + - world! + isOptional: true + parameterType: LIST + outputDefinitions: + artifacts: + Output: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + isArtifactList: true + comp-print-artifact-name: + executorLabel: exec-print-artifact-name + inputDefinitions: + artifacts: + artifact: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-artifact-name-2: + executorLabel: exec-print-artifact-name-2 + inputDefinitions: + artifacts: + artifact: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + outputDefinitions: + parameters: + Output: + parameterType: STRING +deploymentSpec: + executors: + exec-make-artifact: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - make_artifact + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef make_artifact(data: str) -> Artifact:\n artifact = Artifact(uri=dsl.get_uri(),\ + \ metadata={'length': len(data)})\n with open(artifact.path, 'w') as\ + \ f:\n f.write(data)\n return artifact\n\n" + image: python:3.7 + exec-make-dataset: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - make_dataset + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef make_dataset(data: str) -> Dataset:\n dataset = Dataset(uri=dsl.get_uri(),\ + \ metadata={'length': len(data)})\n with open(dataset.path, 'w') as f:\n\ + \ f.write(data)\n return dataset\n\n" + image: python:3.7 + exec-print-artifact-name: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_artifact_name + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_artifact_name(artifact: Artifact) -> str:\n print(artifact.name)\n\ + \ return artifact.name\n\n" + image: python:3.7 + exec-print-artifact-name-2: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_artifact_name + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_artifact_name(artifact: Artifact) -> str:\n print(artifact.name)\n\ + \ return artifact.name\n\n" + image: python:3.7 +pipelineInfo: + name: pipeline-parallelfor-artifacts +root: + dag: + tasks: + for-loop-1: + artifactIterator: + itemInput: pipelinechannel--make-artifacts-Output-loop-item + items: + inputArtifact: pipelinechannel--make-artifacts-Output + componentRef: + name: comp-for-loop-1 + dependentTasks: + - make-artifacts + inputs: + artifacts: + pipelinechannel--make-artifacts-Output: + taskOutputArtifact: + outputArtifactKey: Output + producerTask: make-artifacts + taskInfo: + name: for-loop-1 + for-loop-2: + artifactIterator: + itemInput: pipelinechannel--make-datasets-Output-loop-item + items: + inputArtifact: pipelinechannel--make-datasets-Output + componentRef: + name: comp-for-loop-2 + dependentTasks: + - make-datasets + inputs: + artifacts: + pipelinechannel--make-datasets-Output: + taskOutputArtifact: + outputArtifactKey: Output + producerTask: make-datasets + taskInfo: + name: for-loop-2 + make-artifacts: + cachingOptions: + enableCache: true + componentRef: + name: comp-make-artifacts + taskInfo: + name: make-artifacts + make-datasets: + cachingOptions: + enableCache: true + componentRef: + name: comp-make-datasets + taskInfo: + name: make-datasets +schemaVersion: 2.1.0 +sdkVersion: kfp-2.6.0 diff --git a/sdk/python/test_data/test_data_config.yaml b/sdk/python/test_data/test_data_config.yaml index dc8b23ba6cc..ddfa0802f53 100644 --- a/sdk/python/test_data/test_data_config.yaml +++ b/sdk/python/test_data/test_data_config.yaml @@ -192,6 +192,9 @@ pipelines: - module: cross_loop_after_topology name: my_pipeline execute: false + - module: pipeline_with_parallelfor_list_artifacts + name: my_pipeline + execute: false components: test_data_dir: sdk/python/test_data/components read: true