From 94b1a0de73e7d291d5f0581e92b8d637a335964f Mon Sep 17 00:00:00 2001 From: KevinGrantLee Date: Fri, 20 Sep 2024 09:26:30 -0700 Subject: [PATCH] feat(sdk): support dynamic machine type parameters in pipeline task setters (#11097) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * temp title: change title Signed-off-by: KevinGrantLee * add release notes Signed-off-by: KevinGrantLee * formatting Signed-off-by: KevinGrantLee * feat(backend): move comp logic to workflow params (#10979) * feat(backend): move comp logic to workflow params Signed-off-by: zazulam Co-authored-by: droctothorpe Co-authored-by: andreafehrman Co-authored-by: MonicaZhang1 Co-authored-by: kylekaminky Co-authored-by: CarterFendley Signed-off-by: zazulam * address pr comments Signed-off-by: zazulam * Use function name instead of base name and address edge cases Signed-off-by: droctothorpe Co-authored-by: zazulam * Improve logic and update tests Signed-off-by: droctothorpe Co-authored-by: zazulam * POC hashing command and args Signed-off-by: droctothorpe Co-authored-by: zazulam * Add comments to clarify the logic Signed-off-by: droctothorpe Co-authored-by: zazulam * Hash entire PipelineContainerSpec Signed-off-by: droctothorpe Co-authored-by: zazulam --------- Signed-off-by: zazulam Signed-off-by: droctothorpe Co-authored-by: droctothorpe Co-authored-by: andreafehrman Co-authored-by: MonicaZhang1 Co-authored-by: kylekaminky Co-authored-by: CarterFendley Signed-off-by: KevinGrantLee * feat(component): internal Signed-off-by: Googler PiperOrigin-RevId: 660985413 Signed-off-by: KevinGrantLee * feat(components): internal Signed-off-by: Googler PiperOrigin-RevId: 661332120 Signed-off-by: KevinGrantLee * fix(components): Fix to model batch explanation component for Structured Data pipelines Signed-off-by: Googler PiperOrigin-RevId: 661475667 Signed-off-by: KevinGrantLee * feat(components): Support dynamic values for boot_disk_type, boot_disk_size in preview.custom_job.utils.create_custom_training_job_from_component Signed-off-by: Googler PiperOrigin-RevId: 662242688 Signed-off-by: KevinGrantLee * chore: Upgrade Argo to v3.4.17 (#10978) Signed-off-by: Giulio Frasca Signed-off-by: KevinGrantLee * test: Moved kubeflow-pipelines-manifests to GitHub Actions (#11066) Signed-off-by: vmudadla Signed-off-by: KevinGrantLee * fix: re-enable exit hanler test. (#11100) Signed-off-by: Liav Weiss (EXT-Nokia) Co-authored-by: Liav Weiss (EXT-Nokia) Signed-off-by: KevinGrantLee * fix(frontend): retrieve archived logs from correct location (#11010) * fix(frontend): retrieve archived logs from correct location Signed-off-by: droctothorpe Co-authored-by: andreafehrman Co-authored-by: owmasch * Add namespace tag handling and validation Signed-off-by: droctothorpe Co-authored-by: andreafehrman Co-authored-by: owmasch * Remove whitespace from keyFormat Signed-off-by: droctothorpe Co-authored-by: andreafehrman Co-authored-by: owmasch * Update frontend unit tests Signed-off-by: droctothorpe * Remove superfluous log statements Signed-off-by: droctothorpe Co-authored-by: quinnovator * Add link to keyFormat in manifests Signed-off-by: droctothorpe * Fix workflow parsing for log artifact Signed-off-by: droctothorpe Co-authored-by: quinnovator * Fix unit test Signed-off-by: droctothorpe --------- Signed-off-by: droctothorpe Co-authored-by: andreafehrman Co-authored-by: owmasch Co-authored-by: quinnovator Signed-off-by: KevinGrantLee * feat(component): internal Signed-off-by: Googler PiperOrigin-RevId: 663774557 Signed-off-by: KevinGrantLee * feat(component): internal Signed-off-by: Googler PiperOrigin-RevId: 663872006 Signed-off-by: KevinGrantLee * chore(components): GCPC 2.16.1 Release Signed-off-by: Googler PiperOrigin-RevId: 663883139 Signed-off-by: KevinGrantLee * test: Fail fast when image build fails on tests #11102 (#11115) * Fail fast when image build fails on tests #11102 Signed-off-by: Elay Aharoni (EXT-Nokia) * Fail fast when image build fails on tests #11102 Signed-off-by: Elay Aharoni (EXT-Nokia) --------- Signed-off-by: Elay Aharoni (EXT-Nokia) Co-authored-by: Elay Aharoni (EXT-Nokia) Signed-off-by: KevinGrantLee * fix(components): Use instance.target_field_name format for text-bison models only, use target_field_name for gemini models Signed-off-by: Googler PiperOrigin-RevId: 665638487 Signed-off-by: KevinGrantLee * chore: Renamed GitHub workflows from *.yaml to *.yml for consistency (#11126) Signed-off-by: hbelmiro Signed-off-by: KevinGrantLee * Fix view edit cluster roles (#11067) * Fixing incorrect typing in loop_parallism example Signed-off-by: Oswaldo Gomez * Fixing samples/core/loop_parameter example Signed-off-by: Oswaldo Gomez * Fixing aggregate-to-kubeflow-pipelines-edit Signed-off-by: Oswaldo Gomez * keeping MRs separate. Signed-off-by: Oswaldo Gomez * Adding blank line Signed-off-by: Oswaldo Gomez --------- Signed-off-by: Oswaldo Gomez Co-authored-by: Oswaldo Gomez Signed-off-by: KevinGrantLee * fix(components): Pass moddel name to eval_runner to process batch prediction's output as per the output schema of model used Signed-off-by: Googler PiperOrigin-RevId: 665977093 Signed-off-by: KevinGrantLee * feat(components): release LLM Model Evaluation image version v0.7 Signed-off-by: Jason Dai PiperOrigin-RevId: 666102687 Signed-off-by: KevinGrantLee * chore: Adding @DharmitD to SDK reviewers (#11131) Signed-off-by: ddalvi Signed-off-by: KevinGrantLee * test: Kubeflow Pipelines V2 integration Tests (#11125) Signed-off-by: Diego Lovison Signed-off-by: KevinGrantLee * chore: Add make targets for building driver and launcher images (#11103) Signed-off-by: Giulio Frasca Signed-off-by: KevinGrantLee * feat(Backend + SDK): Update kfp backend and kubernetes sdk to support EmptyDir (#10913) Update kfp backend and kubernetes sdk to support mounting EmptyDir volumes to task pods. Inspired by #10427 Fixes: #10656 Signed-off-by: Greg Sheremeta Signed-off-by: KevinGrantLee * docs:fixing broken links in readme (#11108) Signed-off-by: Fiona Waters Signed-off-by: KevinGrantLee * chore(deps): bump micromatch from 4.0.5 to 4.0.8 in /test/frontend-integration-test (#11132) Bumps [micromatch](https://github.com/micromatch/micromatch) from 4.0.5 to 4.0.8. - [Release notes](https://github.com/micromatch/micromatch/releases) - [Changelog](https://github.com/micromatch/micromatch/blob/4.0.8/CHANGELOG.md) - [Commits](https://github.com/micromatch/micromatch/compare/4.0.5...4.0.8) --- updated-dependencies: - dependency-name: micromatch dependency-type: indirect ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Signed-off-by: KevinGrantLee * Fix: Basic sample tests - sequential is flaky (#11138) Signed-off-by: Diego Lovison Signed-off-by: KevinGrantLee * chore: Wrapped "Failed GetContextByTypeAndName" error for better troubleshooting (#11098) Signed-off-by: hbelmiro Signed-off-by: KevinGrantLee * chore(components): Update AutoSxS and RLHF image tags Signed-off-by: Michael Hu PiperOrigin-RevId: 668536503 Signed-off-by: KevinGrantLee * test: Improvements to wait_for_pods function (#11162) Signed-off-by: hbelmiro Signed-off-by: KevinGrantLee * fix(frontend): fixes filter pipeline text box shows error when typing anything in it. Fixes #10241 (#11096) * Filter pipeline text box shows error when typing anything in it #10241 Signed-off-by: Elay Aharoni (EXT-Nokia) * Filter pipeline text box shows error when typing anything in it #10241 Signed-off-by: Elay Aharoni (EXT-Nokia) --------- Signed-off-by: Elay Aharoni (EXT-Nokia) Co-authored-by: Elay Aharoni (EXT-Nokia) Signed-off-by: KevinGrantLee * correct artifact preview behavior in UI (#11059) This change allows KFP UI to fallback to UI host namespace when no namespaces are provided when referencing the artifact object store provider secret, in default kubeflow deployments this namespace is simply "kubeflow", however the user can customize this behavior by providing the environment variable "SERVER_NAMESPACE" to the KFP UI deployment. Further more, this change addresses a bug that caused URL parse to fail when parsing endpoints without a protocol, this will support such endpoint types as : for object store endpoints, as is the case in the default kfp deployment manifests. Signed-off-by: Humair Khan Signed-off-by: KevinGrantLee * chore: Added DCO link to PR template (#11176) Signed-off-by: Helber Belmiro Signed-off-by: KevinGrantLee * chore(backend): Update driver and launcher licenses (#11177) Signed-off-by: Chen Sun Signed-off-by: KevinGrantLee * chore(backend): update driver and launcher default images (#11178) Signed-off-by: Chen Sun Signed-off-by: KevinGrantLee * chore: Add instructions for releasing driver and launcher images (#11179) Signed-off-by: Chen Sun Signed-off-by: KevinGrantLee * test: Fixed `kfp-runtime-tests` to run on master branch (#11158) Signed-off-by: hbelmiro Signed-off-by: KevinGrantLee * (fix): reduce executor logs (#11169) * remove driver logs from executor These logs congest the executor runtime logs making it difficult for the user to differentiate between logs. The driver logs are unnecessary here and can be removed to reduce this clutter. Signed-off-by: Humair Khan * remove duplicate emissary call in executor As per the initial inline dev comment, argo podspecpatch did not add the emissary call, and had to be manualy added. This was fixed a couple of argo versions back. However, as a result executor pod makes double calls to the executor, which as a consequence also results in superflous logs. This change removes the additional call to emissary to resolve this. Signed-off-by: Humair Khan --------- Signed-off-by: Humair Khan Signed-off-by: KevinGrantLee * chore: add PaulinaPacyna and ouadakarim as reviewers (#11180) Signed-off-by: Chen Sun Signed-off-by: KevinGrantLee * test: Move run-all-gcpc-modules to GitHub Actions (#11157) * add gcpc modules tests to gha Signed-off-by: Amanpreet Singh Bedi * remove run-all-gcpc-modules test driver script Signed-off-by: Amanpreet Singh Bedi * fix path under gcpc modules tests github action Signed-off-by: Amanpreet Singh Bedi * upgrade ubuntu base image Signed-off-by: Amanpreet Singh Bedi * upgrade python version to 3.9 Signed-off-by: Amanpreet Singh Bedi --------- Signed-off-by: Amanpreet Singh Bedi Signed-off-by: Amanpreet Singh Bedi Co-authored-by: Amanpreet Singh Bedi Signed-off-by: KevinGrantLee * fix(sdk): Kfp support for pip trusted host (#11151) Signed-off-by: Diego Lovison Signed-off-by: KevinGrantLee * chore(sdk): Loosening kubernetes dependency constraint (#11079) * Loosening kubernetes dependency constraint Signed-off-by: egeucak * added setuptools in test script Signed-off-by: egeucak --------- Signed-off-by: egeucak Signed-off-by: KevinGrantLee * chore: Remove unwanted Frontend test files (#10973) Signed-off-by: ddalvi Signed-off-by: KevinGrantLee * fix(ui): fixes empty string value in pipeline parameters (#11175) Signed-off-by: Jan Staněk Co-authored-by: Jan Staněk Signed-off-by: KevinGrantLee * chore(backend): update driver and launcher default images (#11182) Signed-off-by: Chen Sun Signed-off-by: KevinGrantLee * chore(release): bumped version to 2.3.0 Signed-off-by: KevinGrantLee * chore: Update RELEASE.md to remove obsolete instructions (#11183) Signed-off-by: Chen Sun Signed-off-by: KevinGrantLee * chore: Release kfp-pipeline-spec 0.4.0 (#11189) Signed-off-by: Chen Sun Signed-off-by: KevinGrantLee * chore: release kfp-kubernetes 1.3.0 (#11190) Signed-off-by: Chen Sun Signed-off-by: KevinGrantLee * chore: update kfp-kubernetes release scripts and instructions (#11191) Signed-off-by: Chen Sun Signed-off-by: KevinGrantLee * feat(sdk)!: Pin kfp-pipeline-spec==0.4.0, kfp-server-api>=2.1.0,<2.4.0 (#11192) Signed-off-by: Chen Sun Signed-off-by: KevinGrantLee * chore(sdk): release KFP SDK 2.9.0 (#11193) Signed-off-by: Chen Sun Signed-off-by: KevinGrantLee * Delete test pipelines as they are duplicate with pipeline_with_resource_spec Signed-off-by: KevinGrantLee --------- Signed-off-by: KevinGrantLee Signed-off-by: zazulam Signed-off-by: droctothorpe Signed-off-by: Googler Signed-off-by: Giulio Frasca Signed-off-by: vmudadla Signed-off-by: Liav Weiss (EXT-Nokia) Signed-off-by: Elay Aharoni (EXT-Nokia) Signed-off-by: hbelmiro Signed-off-by: Oswaldo Gomez Signed-off-by: Jason Dai Signed-off-by: ddalvi Signed-off-by: Diego Lovison Signed-off-by: Greg Sheremeta Signed-off-by: Fiona Waters Signed-off-by: dependabot[bot] Signed-off-by: Michael Hu Signed-off-by: Humair Khan Signed-off-by: Helber Belmiro Signed-off-by: Chen Sun Signed-off-by: Amanpreet Singh Bedi Signed-off-by: Amanpreet Singh Bedi Signed-off-by: egeucak Signed-off-by: Jan Staněk Co-authored-by: Michael Co-authored-by: droctothorpe Co-authored-by: andreafehrman Co-authored-by: MonicaZhang1 Co-authored-by: kylekaminky Co-authored-by: CarterFendley Co-authored-by: Googler Co-authored-by: Giulio Frasca Co-authored-by: Vani Haripriya Mudadla Co-authored-by: Liav Weiss <74174727+liavweiss@users.noreply.github.com> Co-authored-by: Liav Weiss (EXT-Nokia) Co-authored-by: owmasch Co-authored-by: quinnovator Co-authored-by: ElayAharoni <62550608+ElayAharoni@users.noreply.github.com> Co-authored-by: Elay Aharoni (EXT-Nokia) Co-authored-by: Helber Belmiro Co-authored-by: Oswaldo Gomez Co-authored-by: Oswaldo Gomez Co-authored-by: Jason Dai Co-authored-by: Dharmit Dalvi Co-authored-by: Diego Lovison Co-authored-by: Greg Sheremeta Co-authored-by: Fiona Waters Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Michael Hu Co-authored-by: Humair Khan Co-authored-by: Chen Sun Co-authored-by: aman23bedi <168064369+aman23bedi@users.noreply.github.com> Co-authored-by: Amanpreet Singh Bedi Co-authored-by: ege uçak Co-authored-by: Jan Staněk Co-authored-by: Jan Staněk --- sdk/RELEASE.md | 1 + sdk/python/kfp/compiler/compiler_test.py | 24 +- .../kfp/compiler/pipeline_spec_builder.py | 49 +++- sdk/python/kfp/dsl/pipeline_task.py | 116 ++++---- sdk/python/kfp/dsl/pipeline_task_test.py | 76 ++--- sdk/python/kfp/dsl/structures.py | 10 +- .../pipeline_with_resource_spec.yaml | 14 +- ...th_string_machine_fields_pipeline_input.py | 26 ++ ..._string_machine_fields_pipeline_input.yaml | 116 ++++++++ ..._with_string_machine_fields_task_output.py | 41 +++ ...ith_string_machine_fields_task_output.yaml | 265 ++++++++++++++++++ 11 files changed, 608 insertions(+), 130 deletions(-) create mode 100644 sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_pipeline_input.py create mode 100644 sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_pipeline_input.yaml create mode 100644 sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_task_output.py create mode 100644 sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_task_output.yaml diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index 7eda198b920..4419f59dfad 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -1,6 +1,7 @@ # Current Version (in development) ## Features +* Support dynamic machine type parameters in pipeline task setters. [\#11097](https://github.com/kubeflow/pipelines/pull/11097) ## Breaking changes diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 919237fecdc..598983af778 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -3382,31 +3382,31 @@ def simple_pipeline(): ['exec-return-1']['container']) self.assertEqual( - 5, dict_format['deploymentSpec']['executors']['exec-return-1-2'] - ['container']['resources']['cpuLimit']) + '5', dict_format['deploymentSpec']['executors']['exec-return-1-2'] + ['container']['resources']['resourceCpuLimit']) self.assertNotIn( 'memoryLimit', dict_format['deploymentSpec']['executors'] ['exec-return-1-2']['container']['resources']) self.assertEqual( - 50, dict_format['deploymentSpec']['executors']['exec-return-1-3'] - ['container']['resources']['memoryLimit']) + '50G', dict_format['deploymentSpec']['executors']['exec-return-1-3'] + ['container']['resources']['resourceMemoryLimit']) self.assertNotIn( 'cpuLimit', dict_format['deploymentSpec']['executors'] ['exec-return-1-3']['container']['resources']) self.assertEqual( - 2, dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['cpuRequest']) + '2', dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['resourceCpuRequest']) self.assertEqual( - 5, dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['cpuLimit']) + '5', dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['resourceCpuLimit']) self.assertEqual( - 4, dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['memoryRequest']) + '4G', dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['resourceMemoryRequest']) self.assertEqual( - 50, dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['memoryLimit']) + '50G', dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['resourceMemoryLimit']) class TestPlatformConfig(unittest.TestCase): diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index 6d2a0cfa9d2..6e4bc4e8690 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -128,6 +128,12 @@ def build_task_spec_for_task( pipeline_task_spec.retry_policy.CopyFrom( task._task_spec.retry_policy.to_proto()) + # Inject resource fields into inputs + if task.container_spec and task.container_spec.resources: + for key, val in task.container_spec.resources.__dict__.items(): + if val and pipeline_channel.extract_pipeline_channels_from_any(val): + task.inputs[key] = val + for input_name, input_value in task.inputs.items(): # Since LoopParameterArgument and LoopArtifactArgument and LoopArgumentVariable are narrower # types than PipelineParameterChannel, start with them. @@ -607,6 +613,24 @@ def build_container_spec_for_task( Returns: A PipelineContainerSpec object for the task. """ + + def convert_to_placeholder(input_value: str) -> str: + """Checks if input is a pipeline channel and if so, converts to + compiler injected input name.""" + pipeline_channels = ( + pipeline_channel.extract_pipeline_channels_from_any(input_value)) + if pipeline_channels: + assert len(pipeline_channels) == 1 + channel = pipeline_channels[0] + additional_input_name = ( + compiler_utils.additional_input_name_for_pipeline_channel( + channel)) + additional_input_placeholder = placeholders.InputValuePlaceholder( + additional_input_name)._to_string() + input_value = input_value.replace(channel.pattern, + additional_input_placeholder) + return input_value + container_spec = ( pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec( image=task.container_spec.image, @@ -620,23 +644,28 @@ def build_container_spec_for_task( if task.container_spec.resources is not None: if task.container_spec.resources.cpu_request is not None: - container_spec.resources.cpu_request = ( - task.container_spec.resources.cpu_request) + container_spec.resources.resource_cpu_request = ( + convert_to_placeholder( + task.container_spec.resources.cpu_request)) if task.container_spec.resources.cpu_limit is not None: - container_spec.resources.cpu_limit = ( - task.container_spec.resources.cpu_limit) + container_spec.resources.resource_cpu_limit = ( + convert_to_placeholder(task.container_spec.resources.cpu_limit)) if task.container_spec.resources.memory_request is not None: - container_spec.resources.memory_request = ( - task.container_spec.resources.memory_request) + container_spec.resources.resource_memory_request = ( + convert_to_placeholder( + task.container_spec.resources.memory_request)) if task.container_spec.resources.memory_limit is not None: - container_spec.resources.memory_limit = ( - task.container_spec.resources.memory_limit) + container_spec.resources.resource_memory_limit = ( + convert_to_placeholder( + task.container_spec.resources.memory_limit)) if task.container_spec.resources.accelerator_count is not None: container_spec.resources.accelerator.CopyFrom( pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec .ResourceSpec.AcceleratorConfig( - type=task.container_spec.resources.accelerator_type, - count=task.container_spec.resources.accelerator_count, + resource_type=convert_to_placeholder( + task.container_spec.resources.accelerator_type), + resource_count=convert_to_placeholder( + task.container_spec.resources.accelerator_count), )) return container_spec diff --git a/sdk/python/kfp/dsl/pipeline_task.py b/sdk/python/kfp/dsl/pipeline_task.py index 2e82d23378a..773fb1e0676 100644 --- a/sdk/python/kfp/dsl/pipeline_task.py +++ b/sdk/python/kfp/dsl/pipeline_task.py @@ -321,9 +321,9 @@ def _ensure_container_spec_exists(self) -> None: f'{caller_method_name} can only be used on single-step components, not pipelines used as components, or special components like importers.' ) - def _validate_cpu_request_limit(self, cpu: str) -> float: + def _validate_cpu_request_limit(self, cpu: str) -> str: """Validates cpu request/limit string and converts to its numeric - value. + string value. Args: cpu: CPU requests or limits. This string should be a number or a @@ -335,17 +335,22 @@ def _validate_cpu_request_limit(self, cpu: str) -> float: ValueError if the cpu request/limit string value is invalid. Returns: - The numeric value (float) of the cpu request/limit. + The numeric string of the cpu request/limit. """ - if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None: - raise ValueError( - 'Invalid cpu string. Should be float or integer, or integer' - ' followed by "m".') - - return float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu) + if isinstance(cpu, pipeline_channel.PipelineChannel): + cpu = str(cpu) + else: + if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None: + raise ValueError( + 'Invalid cpu string. Should be float or integer, or integer' + ' followed by "m".') + return cpu @block_if_final() - def set_cpu_request(self, cpu: str) -> 'PipelineTask': + def set_cpu_request( + self, + cpu: Union[str, + pipeline_channel.PipelineChannel]) -> 'PipelineTask': """Sets CPU request (minimum) for the task. Args: @@ -370,7 +375,10 @@ def set_cpu_request(self, cpu: str) -> 'PipelineTask': return self @block_if_final() - def set_cpu_limit(self, cpu: str) -> 'PipelineTask': + def set_cpu_limit( + self, + cpu: Union[str, + pipeline_channel.PipelineChannel]) -> 'PipelineTask': """Sets CPU limit (maximum) for the task. Args: @@ -395,7 +403,9 @@ def set_cpu_limit(self, cpu: str) -> 'PipelineTask': return self @block_if_final() - def set_accelerator_limit(self, limit: int) -> 'PipelineTask': + def set_accelerator_limit( + self, limit: Union[int, str, + pipeline_channel.PipelineChannel]) -> 'PipelineTask': """Sets accelerator limit (maximum) for the task. Only applies if accelerator type is also set via .set_accelerator_type(). @@ -406,11 +416,15 @@ def set_accelerator_limit(self, limit: int) -> 'PipelineTask': Self return to allow chained setting calls. """ self._ensure_container_spec_exists() - - if isinstance(limit, str): - if re.match(r'[1-9]\d*$', limit) is None: - raise ValueError(f'{"limit"!r} must be positive integer.') - limit = int(limit) + if isinstance(limit, pipeline_channel.PipelineChannel): + limit = str(limit) + else: + if isinstance(limit, int): + limit = str(limit) + if isinstance(limit, str) and re.match(r'^0$|^1$|^2$|^4$|^8$|^16$', + limit) is None: + raise ValueError( + f'{"limit"!r} must be one of 0, 1, 2, 4, 8, 16.') if self.container_spec.resources is not None: self.container_spec.resources.accelerator_count = limit @@ -438,9 +452,9 @@ def set_gpu_limit(self, gpu: str) -> 'PipelineTask': category=DeprecationWarning) return self.set_accelerator_limit(gpu) - def _validate_memory_request_limit(self, memory: str) -> float: + def _validate_memory_request_limit(self, memory: str) -> str: """Validates memory request/limit string and converts to its numeric - value. + string value. Args: memory: Memory requests or limits. This string should be a number or @@ -451,47 +465,24 @@ def _validate_memory_request_limit(self, memory: str) -> float: ValueError if the memory request/limit string value is invalid. Returns: - The numeric value (float) of the memory request/limit. + The numeric string value of the memory request/limit. """ - if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$', - memory) is None: - raise ValueError( - 'Invalid memory string. Should be a number or a number ' - 'followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", ' - '"Gi", "M", "Mi", "K", "Ki".') - - if memory.endswith('E'): - memory = float(memory[:-1]) * constants._E / constants._G - elif memory.endswith('Ei'): - memory = float(memory[:-2]) * constants._EI / constants._G - elif memory.endswith('P'): - memory = float(memory[:-1]) * constants._P / constants._G - elif memory.endswith('Pi'): - memory = float(memory[:-2]) * constants._PI / constants._G - elif memory.endswith('T'): - memory = float(memory[:-1]) * constants._T / constants._G - elif memory.endswith('Ti'): - memory = float(memory[:-2]) * constants._TI / constants._G - elif memory.endswith('G'): - memory = float(memory[:-1]) - elif memory.endswith('Gi'): - memory = float(memory[:-2]) * constants._GI / constants._G - elif memory.endswith('M'): - memory = float(memory[:-1]) * constants._M / constants._G - elif memory.endswith('Mi'): - memory = float(memory[:-2]) * constants._MI / constants._G - elif memory.endswith('K'): - memory = float(memory[:-1]) * constants._K / constants._G - elif memory.endswith('Ki'): - memory = float(memory[:-2]) * constants._KI / constants._G + if isinstance(memory, pipeline_channel.PipelineChannel): + memory = str(memory) else: - # By default interpret as a plain integer, in the unit of Bytes. - memory = float(memory) / constants._G - + if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$', + memory) is None: + raise ValueError( + 'Invalid memory string. Should be a number or a number ' + 'followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", ' + '"Gi", "M", "Mi", "K", "Ki".') return memory @block_if_final() - def set_memory_request(self, memory: str) -> 'PipelineTask': + def set_memory_request( + self, + memory: Union[str, + pipeline_channel.PipelineChannel]) -> 'PipelineTask': """Sets memory request (minimum) for the task. Args: @@ -515,7 +506,10 @@ def set_memory_request(self, memory: str) -> 'PipelineTask': return self @block_if_final() - def set_memory_limit(self, memory: str) -> 'PipelineTask': + def set_memory_limit( + self, + memory: Union[str, + pipeline_channel.PipelineChannel]) -> 'PipelineTask': """Sets memory limit (maximum) for the task. Args: @@ -579,7 +573,9 @@ def add_node_selector_constraint(self, accelerator: str) -> 'PipelineTask': return self.set_accelerator_type(accelerator) @block_if_final() - def set_accelerator_type(self, accelerator: str) -> 'PipelineTask': + def set_accelerator_type( + self, accelerator: Union[str, pipeline_channel.PipelineChannel] + ) -> 'PipelineTask': """Sets accelerator type to use when executing this task. Args: @@ -589,14 +585,16 @@ def set_accelerator_type(self, accelerator: str) -> 'PipelineTask': Self return to allow chained setting calls. """ self._ensure_container_spec_exists() + if isinstance(accelerator, pipeline_channel.PipelineChannel): + accelerator = str(accelerator) if self.container_spec.resources is not None: self.container_spec.resources.accelerator_type = accelerator if self.container_spec.resources.accelerator_count is None: - self.container_spec.resources.accelerator_count = 1 + self.container_spec.resources.accelerator_count = '1' else: self.container_spec.resources = structures.ResourceSpec( - accelerator_count=1, accelerator_type=accelerator) + accelerator_count='1', accelerator_type=accelerator) return self diff --git a/sdk/python/kfp/dsl/pipeline_task_test.py b/sdk/python/kfp/dsl/pipeline_task_test.py index c5bedc38111..8543058b826 100644 --- a/sdk/python/kfp/dsl/pipeline_task_test.py +++ b/sdk/python/kfp/dsl/pipeline_task_test.py @@ -145,42 +145,40 @@ def test_set_caching_options(self): @parameterized.parameters( { 'cpu': '123', - 'expected_cpu_number': 123, + 'expected_cpu': '123', }, { 'cpu': '123m', - 'expected_cpu_number': 0.123, + 'expected_cpu': '123m', }, { 'cpu': '123.0', - 'expected_cpu_number': 123, + 'expected_cpu': '123.0', }, { 'cpu': '123.0m', - 'expected_cpu_number': 0.123, + 'expected_cpu': '123.0m', }, ) - def test_set_valid_cpu_request_limit(self, cpu: str, - expected_cpu_number: float): + def test_set_valid_cpu_request_limit(self, cpu: str, expected_cpu: str): task = pipeline_task.PipelineTask( component_spec=structures.ComponentSpec.from_yaml_documents( V2_YAML), args={'input1': 'value'}, ) task.set_cpu_request(cpu) - self.assertEqual(expected_cpu_number, + self.assertEqual(expected_cpu, task.container_spec.resources.cpu_request) task.set_cpu_limit(cpu) - self.assertEqual(expected_cpu_number, - task.container_spec.resources.cpu_limit) + self.assertEqual(expected_cpu, task.container_spec.resources.cpu_limit) @parameterized.parameters( { - 'gpu_limit': '123', - 'expected_gpu_number': 123, + 'gpu_limit': '1', + 'expected_gpu_number': '1', },) def test_set_valid_gpu_limit(self, gpu_limit: str, - expected_gpu_number: int): + expected_gpu_number: str): task = pipeline_task.PipelineTask( component_spec=structures.ComponentSpec.from_yaml_documents( V2_YAML), @@ -210,15 +208,19 @@ def test_add_valid_node_selector_constraint(self): @parameterized.parameters( { - 'limit': '123', - 'expected': 123, + 'limit': '1', + 'expected_limit': '1', }, { - 'limit': 123, - 'expected': 123, + 'limit': 1, + 'expected_limit': '1', + }, + { + 'limit': 16, + 'expected_limit': '16', }, ) - def test_set_accelerator_limit(self, limit, expected): + def test_set_accelerator_limit(self, limit, expected_limit): task = pipeline_task.PipelineTask( component_spec=structures.ComponentSpec.from_yaml_documents( V2_YAML), @@ -226,74 +228,74 @@ def test_set_accelerator_limit(self, limit, expected): ) task.set_accelerator_limit(limit) - self.assertEqual(expected, + self.assertEqual(expected_limit, task.container_spec.resources.accelerator_count) @parameterized.parameters( { 'memory': '1E', - 'expected_memory_number': 1000000000, + 'expected_memory': '1E', }, { 'memory': '15Ei', - 'expected_memory_number': 17293822569.102703, + 'expected_memory': '15Ei', }, { 'memory': '2P', - 'expected_memory_number': 2000000, + 'expected_memory': '2P', }, { 'memory': '25Pi', - 'expected_memory_number': 28147497.6710656, + 'expected_memory': '25Pi', }, { 'memory': '3T', - 'expected_memory_number': 3000, + 'expected_memory': '3T', }, { 'memory': '35Ti', - 'expected_memory_number': 38482.90697216, + 'expected_memory': '35Ti', }, { 'memory': '4G', - 'expected_memory_number': 4, + 'expected_memory': '4G', }, { 'memory': '45Gi', - 'expected_memory_number': 48.31838208, + 'expected_memory': '45Gi', }, { 'memory': '5M', - 'expected_memory_number': 0.005, + 'expected_memory': '5M', }, { 'memory': '55Mi', - 'expected_memory_number': 0.05767168, + 'expected_memory': '55Mi', }, { 'memory': '6K', - 'expected_memory_number': 0.000006, + 'expected_memory': '6K', }, { 'memory': '65Ki', - 'expected_memory_number': 0.00006656, + 'expected_memory': '65Ki', }, { 'memory': '7000', - 'expected_memory_number': 0.000007, + 'expected_memory': '7000', }, ) - def test_set_memory_limit(self, memory: str, expected_memory_number: int): + def test_set_memory_limit(self, memory: str, expected_memory: str): task = pipeline_task.PipelineTask( component_spec=structures.ComponentSpec.from_yaml_documents( V2_YAML), args={'input1': 'value'}, ) task.set_memory_request(memory) - self.assertEqual(expected_memory_number, + self.assertEqual(expected_memory, task.container_spec.resources.memory_request) task.set_memory_limit(memory) - self.assertEqual(expected_memory_number, + self.assertEqual(expected_memory, task.container_spec.resources.memory_limit) def test_set_accelerator_type_with_type_only(self): @@ -305,7 +307,7 @@ def test_set_accelerator_type_with_type_only(self): task.set_accelerator_type('NVIDIA_TESLA_K80') self.assertEqual( structures.ResourceSpec( - accelerator_type='NVIDIA_TESLA_K80', accelerator_count=1), + accelerator_type='NVIDIA_TESLA_K80', accelerator_count='1'), task.container_spec.resources) def test_set_accelerator_type_with_accelerator_count(self): @@ -314,10 +316,10 @@ def test_set_accelerator_type_with_accelerator_count(self): V2_YAML), args={'input1': 'value'}, ) - task.set_accelerator_limit('5').set_accelerator_type('TPU_V3') + task.set_accelerator_limit('4').set_accelerator_type('TPU_V3') self.assertEqual( structures.ResourceSpec( - accelerator_type='TPU_V3', accelerator_count=5), + accelerator_type='TPU_V3', accelerator_count='4'), task.container_spec.resources) def test_set_env_variable(self): diff --git a/sdk/python/kfp/dsl/structures.py b/sdk/python/kfp/dsl/structures.py index 440f9a3940a..5a73d93b35c 100644 --- a/sdk/python/kfp/dsl/structures.py +++ b/sdk/python/kfp/dsl/structures.py @@ -239,12 +239,12 @@ class ResourceSpec: container. accelerator_count (optional): the number of accelerators attached. """ - cpu_request: Optional[float] = None - cpu_limit: Optional[float] = None - memory_request: Optional[float] = None - memory_limit: Optional[float] = None + cpu_request: Optional[str] = None + cpu_limit: Optional[str] = None + memory_request: Optional[str] = None + memory_limit: Optional[str] = None accelerator_type: Optional[str] = None - accelerator_count: Optional[int] = None + accelerator_count: Optional[str] = None @dataclasses.dataclass diff --git a/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml b/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml index a9d93bc4de1..07a4ca82975 100644 --- a/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml +++ b/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml @@ -61,12 +61,12 @@ deploymentSpec: image: gcr.io/my-project/my-fancy-trainer resources: accelerator: - count: '1' - type: tpu-v3 - cpuLimit: 4.0 - cpuRequest: 2.0 - memoryLimit: 15.032385536 - memoryRequest: 4.294967296 + resourceCount: '1' + resourceType: tpu-v3 + resourceCpuLimit: '4' + resourceCpuRequest: '2' + resourceMemoryLimit: 14Gi + resourceMemoryRequest: 4Gi pipelineInfo: description: A linear two-step pipeline with resource specification. name: two-step-pipeline-with-resource-spec @@ -119,4 +119,4 @@ root: isOptional: true parameterType: STRING schemaVersion: 2.1.0 -sdkVersion: kfp-2.7.0 +sdkVersion: kfp-2.8.0 diff --git a/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_pipeline_input.py b/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_pipeline_input.py new file mode 100644 index 00000000000..eb8e1f79fe2 --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_pipeline_input.py @@ -0,0 +1,26 @@ +from kfp import dsl + + +@dsl.component +def sum_numbers(a: int, b: int) -> int: + return a + b + + +@dsl.pipeline +def pipeline( + cpu_limit: str = '4000m', + memory_limit: str = '15G', + accelerator_type: str = 'NVIDIA_TESLA_P4', + accelerator_limit: str = '1', +): + sum_numbers_task = sum_numbers(a=1, b=2) + sum_numbers_task.set_cpu_limit(cpu_limit) + sum_numbers_task.set_memory_limit(memory_limit) + sum_numbers_task.set_accelerator_type(accelerator_type) + sum_numbers_task.set_accelerator_limit(accelerator_limit) + + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile( + pipeline_func=pipeline, package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_pipeline_input.yaml b/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_pipeline_input.yaml new file mode 100644 index 00000000000..65815886409 --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_pipeline_input.yaml @@ -0,0 +1,116 @@ +# PIPELINE DEFINITION +# Name: pipeline +# Inputs: +# accelerator_limit: str [Default: '1'] +# accelerator_type: str [Default: 'NVIDIA_TESLA_P4'] +# cpu_limit: str [Default: '2000m'] +# memory_limit: str [Default: '10G'] +components: + comp-sum-numbers: + executorLabel: exec-sum-numbers + inputDefinitions: + parameters: + a: + parameterType: NUMBER_INTEGER + b: + parameterType: NUMBER_INTEGER + outputDefinitions: + parameters: + Output: + parameterType: NUMBER_INTEGER +deploymentSpec: + executors: + exec-sum-numbers: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - sum_numbers + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef sum_numbers(a: int, b:int) -> int:\n return a + b\n\n" + image: python:3.8 + resources: + accelerator: + resourceCount: '{{$.inputs.parameters[''pipelinechannel--accelerator_limit'']}}' + resourceType: '{{$.inputs.parameters[''pipelinechannel--accelerator_type'']}}' + resourceCpuLimit: '{{$.inputs.parameters[''pipelinechannel--cpu_limit'']}}' + resourceMemoryLimit: '{{$.inputs.parameters[''pipelinechannel--memory_limit'']}}' +pipelineInfo: + name: pipeline +root: + dag: + tasks: + sum-numbers: + cachingOptions: + enableCache: true + componentRef: + name: comp-sum-numbers + inputs: + parameters: + a: + runtimeValue: + constant: 1.0 + accelerator_count: + runtimeValue: + constant: '{{$.inputs.parameters[''pipelinechannel--accelerator_limit'']}}' + accelerator_type: + runtimeValue: + constant: '{{$.inputs.parameters[''pipelinechannel--accelerator_type'']}}' + b: + runtimeValue: + constant: 2.0 + cpu_limit: + runtimeValue: + constant: '{{$.inputs.parameters[''pipelinechannel--cpu_limit'']}}' + memory_limit: + runtimeValue: + constant: '{{$.inputs.parameters[''pipelinechannel--memory_limit'']}}' + pipelinechannel--accelerator_limit: + componentInputParameter: accelerator_limit + pipelinechannel--accelerator_type: + componentInputParameter: accelerator_type + pipelinechannel--cpu_limit: + componentInputParameter: cpu_limit + pipelinechannel--memory_limit: + componentInputParameter: memory_limit + taskInfo: + name: sum-numbers + inputDefinitions: + parameters: + accelerator_limit: + defaultValue: '1' + isOptional: true + parameterType: STRING + accelerator_type: + defaultValue: NVIDIA_TESLA_P4 + isOptional: true + parameterType: STRING + cpu_limit: + defaultValue: 2000m + isOptional: true + parameterType: STRING + memory_limit: + defaultValue: 10G + isOptional: true + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.8.0 diff --git a/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_task_output.py b/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_task_output.py new file mode 100644 index 00000000000..7795bb8a0f4 --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_task_output.py @@ -0,0 +1,41 @@ +from kfp import dsl + + +@dsl.component +def cpu_limit() -> str: + return '4000m' + + +@dsl.component +def memory_limit() -> str: + return '15G' + + +@dsl.component +def accelerator_type() -> str: + return 'NVIDIA_TESLA_P4' + + +@dsl.component +def accelerator_limit() -> str: + return '1' + + +@dsl.component +def sum_numbers(a: int, b: int) -> int: + return a + b + + +@dsl.pipeline +def pipeline(): + sum_numbers_task = sum_numbers(a=1, b=2) + sum_numbers_task.set_cpu_limit(cpu_limit().output) + sum_numbers_task.set_memory_limit(memory_limit().output) + sum_numbers_task.set_accelerator_type(accelerator_type().output) + sum_numbers_task.set_accelerator_limit(accelerator_limit().output) + + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile( + pipeline_func=pipeline, package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_task_output.yaml b/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_task_output.yaml new file mode 100644 index 00000000000..13f80133f8b --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_string_machine_fields_task_output.yaml @@ -0,0 +1,265 @@ +# PIPELINE DEFINITION +# Name: pipeline +components: + comp-accelerator-limit: + executorLabel: exec-accelerator-limit + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-accelerator-type: + executorLabel: exec-accelerator-type + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-cpu-limit: + executorLabel: exec-cpu-limit + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-memory-limit: + executorLabel: exec-memory-limit + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-sum-numbers: + executorLabel: exec-sum-numbers + inputDefinitions: + parameters: + a: + parameterType: NUMBER_INTEGER + b: + parameterType: NUMBER_INTEGER + outputDefinitions: + parameters: + Output: + parameterType: NUMBER_INTEGER +deploymentSpec: + executors: + exec-accelerator-limit: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - accelerator_limit + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef accelerator_limit() -> str:\n return '1'\n\n" + image: python:3.8 + exec-accelerator-type: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - accelerator_type + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef accelerator_type() -> str:\n return 'NVIDIA_TESLA_P4'\n\n" + image: python:3.8 + exec-cpu-limit: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - cpu_limit + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef cpu_limit() -> str:\n return '4000m'\n\n" + image: python:3.8 + exec-memory-limit: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - memory_limit + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef memory_limit() -> str:\n return '15G'\n\n" + image: python:3.8 + exec-sum-numbers: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - sum_numbers + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef sum_numbers(a: int, b:int) -> int:\n return a + b\n\n" + image: python:3.8 + resources: + accelerator: + resourceCount: '{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}' + resourceType: '{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}' + resourceCpuLimit: '{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}' + resourceMemoryLimit: '{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}' +pipelineInfo: + name: pipeline +root: + dag: + tasks: + accelerator-limit: + cachingOptions: + enableCache: true + componentRef: + name: comp-accelerator-limit + taskInfo: + name: accelerator-limit + accelerator-type: + cachingOptions: + enableCache: true + componentRef: + name: comp-accelerator-type + taskInfo: + name: accelerator-type + cpu-limit: + cachingOptions: + enableCache: true + componentRef: + name: comp-cpu-limit + taskInfo: + name: cpu-limit + memory-limit: + cachingOptions: + enableCache: true + componentRef: + name: comp-memory-limit + taskInfo: + name: memory-limit + sum-numbers: + cachingOptions: + enableCache: true + componentRef: + name: comp-sum-numbers + inputs: + parameters: + a: + runtimeValue: + constant: 1.0 + accelerator_count: + runtimeValue: + constant: '{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}' + accelerator_type: + runtimeValue: + constant: '{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}' + b: + runtimeValue: + constant: 2.0 + cpu_limit: + runtimeValue: + constant: '{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}' + memory_limit: + runtimeValue: + constant: '{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}' + pipelinechannel--accelerator-limit-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: accelerator-limit + pipelinechannel--accelerator-type-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: accelerator-type + pipelinechannel--cpu-limit-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: cpu-limit + pipelinechannel--memory-limit-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: memory-limit + taskInfo: + name: sum-numbers +schemaVersion: 2.1.0 +sdkVersion: kfp-2.8.0