diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index 7eda198b920..4419f59dfad 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -1,6 +1,7 @@ # Current Version (in development) ## Features +* Support dynamic machine type parameters in pipeline task setters. [\#11097](https://github.com/kubeflow/pipelines/pull/11097) ## Breaking changes diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 919237fecdc..598983af778 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -3382,31 +3382,31 @@ def simple_pipeline(): ['exec-return-1']['container']) self.assertEqual( - 5, dict_format['deploymentSpec']['executors']['exec-return-1-2'] - ['container']['resources']['cpuLimit']) + '5', dict_format['deploymentSpec']['executors']['exec-return-1-2'] + ['container']['resources']['resourceCpuLimit']) self.assertNotIn( 'memoryLimit', dict_format['deploymentSpec']['executors'] ['exec-return-1-2']['container']['resources']) self.assertEqual( - 50, dict_format['deploymentSpec']['executors']['exec-return-1-3'] - ['container']['resources']['memoryLimit']) + '50G', dict_format['deploymentSpec']['executors']['exec-return-1-3'] + ['container']['resources']['resourceMemoryLimit']) self.assertNotIn( 'cpuLimit', dict_format['deploymentSpec']['executors'] ['exec-return-1-3']['container']['resources']) self.assertEqual( - 2, dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['cpuRequest']) + '2', dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['resourceCpuRequest']) self.assertEqual( - 5, dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['cpuLimit']) + '5', dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['resourceCpuLimit']) self.assertEqual( - 4, dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['memoryRequest']) + '4G', dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['resourceMemoryRequest']) self.assertEqual( - 50, dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['memoryLimit']) + '50G', dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['resourceMemoryLimit']) class TestPlatformConfig(unittest.TestCase): diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index 6d2a0cfa9d2..6e4bc4e8690 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -128,6 +128,12 @@ def build_task_spec_for_task( pipeline_task_spec.retry_policy.CopyFrom( task._task_spec.retry_policy.to_proto()) + # Inject resource fields into inputs + if task.container_spec and task.container_spec.resources: + for key, val in task.container_spec.resources.__dict__.items(): + if val and pipeline_channel.extract_pipeline_channels_from_any(val): + task.inputs[key] = val + for input_name, input_value in task.inputs.items(): # Since LoopParameterArgument and LoopArtifactArgument and LoopArgumentVariable are narrower # types than PipelineParameterChannel, start with them. @@ -607,6 +613,24 @@ def build_container_spec_for_task( Returns: A PipelineContainerSpec object for the task. """ + + def convert_to_placeholder(input_value: str) -> str: + """Checks if input is a pipeline channel and if so, converts to + compiler injected input name.""" + pipeline_channels = ( + pipeline_channel.extract_pipeline_channels_from_any(input_value)) + if pipeline_channels: + assert len(pipeline_channels) == 1 + channel = pipeline_channels[0] + additional_input_name = ( + compiler_utils.additional_input_name_for_pipeline_channel( + channel)) + additional_input_placeholder = placeholders.InputValuePlaceholder( + additional_input_name)._to_string() + input_value = input_value.replace(channel.pattern, + additional_input_placeholder) + return input_value + container_spec = ( pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec( image=task.container_spec.image, @@ -620,23 +644,28 @@ def build_container_spec_for_task( if task.container_spec.resources is not None: if task.container_spec.resources.cpu_request is not None: - container_spec.resources.cpu_request = ( - task.container_spec.resources.cpu_request) + container_spec.resources.resource_cpu_request = ( + convert_to_placeholder( + task.container_spec.resources.cpu_request)) if task.container_spec.resources.cpu_limit is not None: - container_spec.resources.cpu_limit = ( - task.container_spec.resources.cpu_limit) + container_spec.resources.resource_cpu_limit = ( + convert_to_placeholder(task.container_spec.resources.cpu_limit)) if task.container_spec.resources.memory_request is not None: - container_spec.resources.memory_request = ( - task.container_spec.resources.memory_request) + container_spec.resources.resource_memory_request = ( + convert_to_placeholder( + task.container_spec.resources.memory_request)) if task.container_spec.resources.memory_limit is not None: - container_spec.resources.memory_limit = ( - task.container_spec.resources.memory_limit) + container_spec.resources.resource_memory_limit = ( + convert_to_placeholder( + task.container_spec.resources.memory_limit)) if task.container_spec.resources.accelerator_count is not None: container_spec.resources.accelerator.CopyFrom( pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec .ResourceSpec.AcceleratorConfig( - type=task.container_spec.resources.accelerator_type, - count=task.container_spec.resources.accelerator_count, + resource_type=convert_to_placeholder( + task.container_spec.resources.accelerator_type), + resource_count=convert_to_placeholder( + task.container_spec.resources.accelerator_count), )) return container_spec diff --git a/sdk/python/kfp/dsl/pipeline_task.py b/sdk/python/kfp/dsl/pipeline_task.py index 2e82d23378a..773fb1e0676 100644 --- a/sdk/python/kfp/dsl/pipeline_task.py +++ b/sdk/python/kfp/dsl/pipeline_task.py @@ -321,9 +321,9 @@ def _ensure_container_spec_exists(self) -> None: f'{caller_method_name} can only be used on single-step components, not pipelines used as components, or special components like importers.' ) - def _validate_cpu_request_limit(self, cpu: str) -> float: + def _validate_cpu_request_limit(self, cpu: str) -> str: """Validates cpu request/limit string and converts to its numeric - value. + string value. Args: cpu: CPU requests or limits. This string should be a number or a @@ -335,17 +335,22 @@ def _validate_cpu_request_limit(self, cpu: str) -> float: ValueError if the cpu request/limit string value is invalid. Returns: - The numeric value (float) of the cpu request/limit. + The numeric string of the cpu request/limit. """ - if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None: - raise ValueError( - 'Invalid cpu string. Should be float or integer, or integer' - ' followed by "m".') - - return float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu) + if isinstance(cpu, pipeline_channel.PipelineChannel): + cpu = str(cpu) + else: + if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None: + raise ValueError( + 'Invalid cpu string. Should be float or integer, or integer' + ' followed by "m".') + return cpu @block_if_final() - def set_cpu_request(self, cpu: str) -> 'PipelineTask': + def set_cpu_request( + self, + cpu: Union[str, + pipeline_channel.PipelineChannel]) -> 'PipelineTask': """Sets CPU request (minimum) for the task. Args: @@ -370,7 +375,10 @@ def set_cpu_request(self, cpu: str) -> 'PipelineTask': return self @block_if_final() - def set_cpu_limit(self, cpu: str) -> 'PipelineTask': + def set_cpu_limit( + self, + cpu: Union[str, + pipeline_channel.PipelineChannel]) -> 'PipelineTask': """Sets CPU limit (maximum) for the task. Args: @@ -395,7 +403,9 @@ def set_cpu_limit(self, cpu: str) -> 'PipelineTask': return self @block_if_final() - def set_accelerator_limit(self, limit: int) -> 'PipelineTask': + def set_accelerator_limit( + self, limit: Union[int, str, + pipeline_channel.PipelineChannel]) -> 'PipelineTask': """Sets accelerator limit (maximum) for the task. Only applies if accelerator type is also set via .set_accelerator_type(). @@ -406,11 +416,15 @@ def set_accelerator_limit(self, limit: int) -> 'PipelineTask': Self return to allow chained setting calls. """ self._ensure_container_spec_exists() - - if isinstance(limit, str): - if re.match(r'[1-9]\d*$', limit) is None: - raise ValueError(f'{"limit"!r} must be positive integer.') - limit = int(limit) + if isinstance(limit, pipeline_channel.PipelineChannel): + limit = str(limit) + else: + if isinstance(limit, int): + limit = str(limit) + if isinstance(limit, str) and re.match(r'^0$|^1$|^2$|^4$|^8$|^16$', + limit) is None: + raise ValueError( + f'{"limit"!r} must be one of 0, 1, 2, 4, 8, 16.') if self.container_spec.resources is not None: self.container_spec.resources.accelerator_count = limit @@ -438,9 +452,9 @@ def set_gpu_limit(self, gpu: str) -> 'PipelineTask': category=DeprecationWarning) return self.set_accelerator_limit(gpu) - def _validate_memory_request_limit(self, memory: str) -> float: + def _validate_memory_request_limit(self, memory: str) -> str: """Validates memory request/limit string and converts to its numeric - value. + string value. Args: memory: Memory requests or limits. This string should be a number or @@ -451,47 +465,24 @@ def _validate_memory_request_limit(self, memory: str) -> float: ValueError if the memory request/limit string value is invalid. Returns: - The numeric value (float) of the memory request/limit. + The numeric string value of the memory request/limit. """ - if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$', - memory) is None: - raise ValueError( - 'Invalid memory string. Should be a number or a number ' - 'followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", ' - '"Gi", "M", "Mi", "K", "Ki".') - - if memory.endswith('E'): - memory = float(memory[:-1]) * constants._E / constants._G - elif memory.endswith('Ei'): - memory = float(memory[:-2]) * constants._EI / constants._G - elif memory.endswith('P'): - memory = float(memory[:-1]) * constants._P / constants._G - elif memory.endswith('Pi'): - memory = float(memory[:-2]) * constants._PI / constants._G - elif memory.endswith('T'): - memory = float(memory[:-1]) * constants._T / constants._G - elif memory.endswith('Ti'): - memory = float(memory[:-2]) * constants._TI / constants._G - elif memory.endswith('G'): - memory = float(memory[:-1]) - elif memory.endswith('Gi'): - memory = float(memory[:-2]) * constants._GI / constants._G - elif memory.endswith('M'): - memory = float(memory[:-1]) * constants._M / constants._G - elif memory.endswith('Mi'): - memory = float(memory[:-2]) * constants._MI / constants._G - elif memory.endswith('K'): - memory = float(memory[:-1]) * constants._K / constants._G - elif memory.endswith('Ki'): - memory = float(memory[:-2]) * constants._KI / constants._G + if isinstance(memory, pipeline_channel.PipelineChannel): + memory = str(memory) else: - # By default interpret as a plain integer, in the unit of Bytes. - memory = float(memory) / constants._G - + if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$', + memory) is None: + raise ValueError( + 'Invalid memory string. Should be a number or a number ' + 'followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", ' + '"Gi", "M", "Mi", "K", "Ki".') return memory @block_if_final() - def set_memory_request(self, memory: str) -> 'PipelineTask': + def set_memory_request( + self, + memory: Union[str, + pipeline_channel.PipelineChannel]) -> 'PipelineTask': """Sets memory request (minimum) for the task. Args: @@ -515,7 +506,10 @@ def set_memory_request(self, memory: str) -> 'PipelineTask': return self @block_if_final() - def set_memory_limit(self, memory: str) -> 'PipelineTask': + def set_memory_limit( + self, + memory: Union[str, + pipeline_channel.PipelineChannel]) -> 'PipelineTask': """Sets memory limit (maximum) for the task. Args: @@ -579,7 +573,9 @@ def add_node_selector_constraint(self, accelerator: str) -> 'PipelineTask': return self.set_accelerator_type(accelerator) @block_if_final() - def set_accelerator_type(self, accelerator: str) -> 'PipelineTask': + def set_accelerator_type( + self, accelerator: Union[str, pipeline_channel.PipelineChannel] + ) -> 'PipelineTask': """Sets accelerator type to use when executing this task. Args: @@ -589,14 +585,16 @@ def set_accelerator_type(self, accelerator: str) -> 'PipelineTask': Self return to allow chained setting calls. """ self._ensure_container_spec_exists() + if isinstance(accelerator, pipeline_channel.PipelineChannel): + accelerator = str(accelerator) if self.container_spec.resources is not None: self.container_spec.resources.accelerator_type = accelerator if self.container_spec.resources.accelerator_count is None: - self.container_spec.resources.accelerator_count = 1 + self.container_spec.resources.accelerator_count = '1' else: self.container_spec.resources = structures.ResourceSpec( - accelerator_count=1, accelerator_type=accelerator) + accelerator_count='1', accelerator_type=accelerator) return self diff --git a/sdk/python/kfp/dsl/pipeline_task_test.py b/sdk/python/kfp/dsl/pipeline_task_test.py index c5bedc38111..8543058b826 100644 --- a/sdk/python/kfp/dsl/pipeline_task_test.py +++ b/sdk/python/kfp/dsl/pipeline_task_test.py @@ -145,42 +145,40 @@ def test_set_caching_options(self): @parameterized.parameters( { 'cpu': '123', - 'expected_cpu_number': 123, + 'expected_cpu': '123', }, { 'cpu': '123m', - 'expected_cpu_number': 0.123, + 'expected_cpu': '123m', }, { 'cpu': '123.0', - 'expected_cpu_number': 123, + 'expected_cpu': '123.0', }, { 'cpu': '123.0m', - 'expected_cpu_number': 0.123, + 'expected_cpu': '123.0m', }, ) - def test_set_valid_cpu_request_limit(self, cpu: str, - expected_cpu_number: float): + def test_set_valid_cpu_request_limit(self, cpu: str, expected_cpu: str): task = pipeline_task.PipelineTask( component_spec=structures.ComponentSpec.from_yaml_documents( V2_YAML), args={'input1': 'value'}, ) task.set_cpu_request(cpu) - self.assertEqual(expected_cpu_number, + self.assertEqual(expected_cpu, task.container_spec.resources.cpu_request) task.set_cpu_limit(cpu) - self.assertEqual(expected_cpu_number, - task.container_spec.resources.cpu_limit) + self.assertEqual(expected_cpu, task.container_spec.resources.cpu_limit) @parameterized.parameters( { - 'gpu_limit': '123', - 'expected_gpu_number': 123, + 'gpu_limit': '1', + 'expected_gpu_number': '1', },) def test_set_valid_gpu_limit(self, gpu_limit: str, - expected_gpu_number: int): + expected_gpu_number: str): task = pipeline_task.PipelineTask( component_spec=structures.ComponentSpec.from_yaml_documents( V2_YAML), @@ -210,15 +208,19 @@ def test_add_valid_node_selector_constraint(self): @parameterized.parameters( { - 'limit': '123', - 'expected': 123, + 'limit': '1', + 'expected_limit': '1', }, { - 'limit': 123, - 'expected': 123, + 'limit': 1, + 'expected_limit': '1', + }, + { + 'limit': 16, + 'expected_limit': '16', }, ) - def test_set_accelerator_limit(self, limit, expected): + def test_set_accelerator_limit(self, limit, expected_limit): task = pipeline_task.PipelineTask( component_spec=structures.ComponentSpec.from_yaml_documents( V2_YAML), @@ -226,74 +228,74 @@ def test_set_accelerator_limit(self, limit, expected): ) task.set_accelerator_limit(limit) - self.assertEqual(expected, + self.assertEqual(expected_limit, task.container_spec.resources.accelerator_count) @parameterized.parameters( { 'memory': '1E', - 'expected_memory_number': 1000000000, + 'expected_memory': '1E', }, { 'memory': '15Ei', - 'expected_memory_number': 17293822569.102703, + 'expected_memory': '15Ei', }, { 'memory': '2P', - 'expected_memory_number': 2000000, + 'expected_memory': '2P', }, { 'memory': '25Pi', - 'expected_memory_number': 28147497.6710656, + 'expected_memory': '25Pi', }, { 'memory': '3T', - 'expected_memory_number': 3000, + 'expected_memory': '3T', }, { 'memory': '35Ti', - 'expected_memory_number': 38482.90697216, + 'expected_memory': '35Ti', }, { 'memory': '4G', - 'expected_memory_number': 4, + 'expected_memory': '4G', }, { 'memory': '45Gi', - 'expected_memory_number': 48.31838208, + 'expected_memory': '45Gi', }, { 'memory': '5M', - 'expected_memory_number': 0.005, + 'expected_memory': '5M', }, { 'memory': '55Mi', - 'expected_memory_number': 0.05767168, + 'expected_memory': '55Mi', }, { 'memory': '6K', - 'expected_memory_number': 0.000006, + 'expected_memory': '6K', }, { 'memory': '65Ki', - 'expected_memory_number': 0.00006656, + 'expected_memory': '65Ki', }, { 'memory': '7000', - 'expected_memory_number': 0.000007, + 'expected_memory': '7000', }, ) - def test_set_memory_limit(self, memory: str, expected_memory_number: int): + def test_set_memory_limit(self, memory: str, expected_memory: str): task = pipeline_task.PipelineTask( component_spec=structures.ComponentSpec.from_yaml_documents( V2_YAML), args={'input1': 'value'}, ) task.set_memory_request(memory) - self.assertEqual(expected_memory_number, + self.assertEqual(expected_memory, task.container_spec.resources.memory_request) task.set_memory_limit(memory) - self.assertEqual(expected_memory_number, + self.assertEqual(expected_memory, task.container_spec.resources.memory_limit) def test_set_accelerator_type_with_type_only(self): @@ -305,7 +307,7 @@ def test_set_accelerator_type_with_type_only(self): task.set_accelerator_type('NVIDIA_TESLA_K80') self.assertEqual( structures.ResourceSpec( - accelerator_type='NVIDIA_TESLA_K80', accelerator_count=1), + accelerator_type='NVIDIA_TESLA_K80', accelerator_count='1'), task.container_spec.resources) def test_set_accelerator_type_with_accelerator_count(self): @@ -314,10 +316,10 @@ def test_set_accelerator_type_with_accelerator_count(self): V2_YAML), args={'input1': 'value'}, ) - task.set_accelerator_limit('5').set_accelerator_type('TPU_V3') + task.set_accelerator_limit('4').set_accelerator_type('TPU_V3') self.assertEqual( structures.ResourceSpec( - accelerator_type='TPU_V3', accelerator_count=5), + accelerator_type='TPU_V3', accelerator_count='4'), task.container_spec.resources) def test_set_env_variable(self): diff --git a/sdk/python/kfp/dsl/structures.py b/sdk/python/kfp/dsl/structures.py index 440f9a3940a..5a73d93b35c 100644 --- a/sdk/python/kfp/dsl/structures.py +++ b/sdk/python/kfp/dsl/structures.py @@ -239,12 +239,12 @@ class ResourceSpec: container. accelerator_count (optional): the number of accelerators attached. """ - cpu_request: Optional[float] = None - cpu_limit: Optional[float] = None - memory_request: Optional[float] = None - memory_limit: Optional[float] = None + cpu_request: Optional[str] = None + cpu_limit: Optional[str] = None + memory_request: Optional[str] = None + memory_limit: Optional[str] = None accelerator_type: Optional[str] = None - accelerator_count: Optional[int] = None + accelerator_count: Optional[str] = None @dataclasses.dataclass diff --git a/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml b/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml index a9d93bc4de1..07a4ca82975 100644 --- a/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml +++ b/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml @@ -61,12 +61,12 @@ deploymentSpec: image: gcr.io/my-project/my-fancy-trainer resources: accelerator: - count: '1' - type: tpu-v3 - cpuLimit: 4.0 - cpuRequest: 2.0 - memoryLimit: 15.032385536 - memoryRequest: 4.294967296 + resourceCount: '1' + resourceType: tpu-v3 + resourceCpuLimit: '4' + resourceCpuRequest: '2' + resourceMemoryLimit: 14Gi + resourceMemoryRequest: 4Gi pipelineInfo: description: A linear two-step pipeline with resource specification. name: two-step-pipeline-with-resource-spec @@ -119,4 +119,4 @@ root: isOptional: true parameterType: STRING schemaVersion: 2.1.0 -sdkVersion: kfp-2.7.0 +sdkVersion: kfp-2.8.0 diff --git a/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_pipeline_input.py b/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_pipeline_input.py new file mode 100644 index 00000000000..eb8e1f79fe2 --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_pipeline_input.py @@ -0,0 +1,26 @@ +from kfp import dsl + + +@dsl.component +def sum_numbers(a: int, b: int) -> int: + return a + b + + +@dsl.pipeline +def pipeline( + cpu_limit: str = '4000m', + memory_limit: str = '15G', + accelerator_type: str = 'NVIDIA_TESLA_P4', + accelerator_limit: str = '1', +): + sum_numbers_task = sum_numbers(a=1, b=2) + sum_numbers_task.set_cpu_limit(cpu_limit) + sum_numbers_task.set_memory_limit(memory_limit) + sum_numbers_task.set_accelerator_type(accelerator_type) + sum_numbers_task.set_accelerator_limit(accelerator_limit) + + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile( + pipeline_func=pipeline, package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_pipeline_input.yaml b/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_pipeline_input.yaml new file mode 100644 index 00000000000..65815886409 --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_pipeline_input.yaml @@ -0,0 +1,116 @@ +# PIPELINE DEFINITION +# Name: pipeline +# Inputs: +# accelerator_limit: str [Default: '1'] +# accelerator_type: str [Default: 'NVIDIA_TESLA_P4'] +# cpu_limit: str [Default: '2000m'] +# memory_limit: str [Default: '10G'] +components: + comp-sum-numbers: + executorLabel: exec-sum-numbers + inputDefinitions: + parameters: + a: + parameterType: NUMBER_INTEGER + b: + parameterType: NUMBER_INTEGER + outputDefinitions: + parameters: + Output: + parameterType: NUMBER_INTEGER +deploymentSpec: + executors: + exec-sum-numbers: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - sum_numbers + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef sum_numbers(a: int, b:int) -> int:\n return a + b\n\n" + image: python:3.8 + resources: + accelerator: + resourceCount: '{{$.inputs.parameters[''pipelinechannel--accelerator_limit'']}}' + resourceType: '{{$.inputs.parameters[''pipelinechannel--accelerator_type'']}}' + resourceCpuLimit: '{{$.inputs.parameters[''pipelinechannel--cpu_limit'']}}' + resourceMemoryLimit: '{{$.inputs.parameters[''pipelinechannel--memory_limit'']}}' +pipelineInfo: + name: pipeline +root: + dag: + tasks: + sum-numbers: + cachingOptions: + enableCache: true + componentRef: + name: comp-sum-numbers + inputs: + parameters: + a: + runtimeValue: + constant: 1.0 + accelerator_count: + runtimeValue: + constant: '{{$.inputs.parameters[''pipelinechannel--accelerator_limit'']}}' + accelerator_type: + runtimeValue: + constant: '{{$.inputs.parameters[''pipelinechannel--accelerator_type'']}}' + b: + runtimeValue: + constant: 2.0 + cpu_limit: + runtimeValue: + constant: '{{$.inputs.parameters[''pipelinechannel--cpu_limit'']}}' + memory_limit: + runtimeValue: + constant: '{{$.inputs.parameters[''pipelinechannel--memory_limit'']}}' + pipelinechannel--accelerator_limit: + componentInputParameter: accelerator_limit + pipelinechannel--accelerator_type: + componentInputParameter: accelerator_type + pipelinechannel--cpu_limit: + componentInputParameter: cpu_limit + pipelinechannel--memory_limit: + componentInputParameter: memory_limit + taskInfo: + name: sum-numbers + inputDefinitions: + parameters: + accelerator_limit: + defaultValue: '1' + isOptional: true + parameterType: STRING + accelerator_type: + defaultValue: NVIDIA_TESLA_P4 + isOptional: true + parameterType: STRING + cpu_limit: + defaultValue: 2000m + isOptional: true + parameterType: STRING + memory_limit: + defaultValue: 10G + isOptional: true + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.8.0 diff --git a/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_task_output.py b/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_task_output.py new file mode 100644 index 00000000000..7795bb8a0f4 --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_task_output.py @@ -0,0 +1,41 @@ +from kfp import dsl + + +@dsl.component +def cpu_limit() -> str: + return '4000m' + + +@dsl.component +def memory_limit() -> str: + return '15G' + + +@dsl.component +def accelerator_type() -> str: + return 'NVIDIA_TESLA_P4' + + +@dsl.component +def accelerator_limit() -> str: + return '1' + + +@dsl.component +def sum_numbers(a: int, b: int) -> int: + return a + b + + +@dsl.pipeline +def pipeline(): + sum_numbers_task = sum_numbers(a=1, b=2) + sum_numbers_task.set_cpu_limit(cpu_limit().output) + sum_numbers_task.set_memory_limit(memory_limit().output) + sum_numbers_task.set_accelerator_type(accelerator_type().output) + sum_numbers_task.set_accelerator_limit(accelerator_limit().output) + + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile( + pipeline_func=pipeline, package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_task_output.yaml b/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_task_output.yaml new file mode 100644 index 00000000000..13f80133f8b --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_task_output.yaml @@ -0,0 +1,265 @@ +# PIPELINE DEFINITION +# Name: pipeline +components: + comp-accelerator-limit: + executorLabel: exec-accelerator-limit + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-accelerator-type: + executorLabel: exec-accelerator-type + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-cpu-limit: + executorLabel: exec-cpu-limit + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-memory-limit: + executorLabel: exec-memory-limit + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-sum-numbers: + executorLabel: exec-sum-numbers + inputDefinitions: + parameters: + a: + parameterType: NUMBER_INTEGER + b: + parameterType: NUMBER_INTEGER + outputDefinitions: + parameters: + Output: + parameterType: NUMBER_INTEGER +deploymentSpec: + executors: + exec-accelerator-limit: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - accelerator_limit + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef accelerator_limit() -> str:\n return '1'\n\n" + image: python:3.8 + exec-accelerator-type: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - accelerator_type + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef accelerator_type() -> str:\n return 'NVIDIA_TESLA_P4'\n\n" + image: python:3.8 + exec-cpu-limit: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - cpu_limit + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef cpu_limit() -> str:\n return '4000m'\n\n" + image: python:3.8 + exec-memory-limit: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - memory_limit + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef memory_limit() -> str:\n return '15G'\n\n" + image: python:3.8 + exec-sum-numbers: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - sum_numbers + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef sum_numbers(a: int, b:int) -> int:\n return a + b\n\n" + image: python:3.8 + resources: + accelerator: + resourceCount: '{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}' + resourceType: '{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}' + resourceCpuLimit: '{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}' + resourceMemoryLimit: '{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}' +pipelineInfo: + name: pipeline +root: + dag: + tasks: + accelerator-limit: + cachingOptions: + enableCache: true + componentRef: + name: comp-accelerator-limit + taskInfo: + name: accelerator-limit + accelerator-type: + cachingOptions: + enableCache: true + componentRef: + name: comp-accelerator-type + taskInfo: + name: accelerator-type + cpu-limit: + cachingOptions: + enableCache: true + componentRef: + name: comp-cpu-limit + taskInfo: + name: cpu-limit + memory-limit: + cachingOptions: + enableCache: true + componentRef: + name: comp-memory-limit + taskInfo: + name: memory-limit + sum-numbers: + cachingOptions: + enableCache: true + componentRef: + name: comp-sum-numbers + inputs: + parameters: + a: + runtimeValue: + constant: 1.0 + accelerator_count: + runtimeValue: + constant: '{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}' + accelerator_type: + runtimeValue: + constant: '{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}' + b: + runtimeValue: + constant: 2.0 + cpu_limit: + runtimeValue: + constant: '{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}' + memory_limit: + runtimeValue: + constant: '{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}' + pipelinechannel--accelerator-limit-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: accelerator-limit + pipelinechannel--accelerator-type-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: accelerator-type + pipelinechannel--cpu-limit-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: cpu-limit + pipelinechannel--memory-limit-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: memory-limit + taskInfo: + name: sum-numbers +schemaVersion: 2.1.0 +sdkVersion: kfp-2.8.0