From 7b7918ebf8c30e6ceec99283ef20dbc02fdf6a42 Mon Sep 17 00:00:00 2001 From: Googler Date: Mon, 12 Aug 2024 15:11:08 -0700 Subject: [PATCH] feat(components): Support dynamic values for boot_disk_type, boot_disk_size in preview.custom_job.utils.create_custom_training_job_from_component Signed-off-by: Googler PiperOrigin-RevId: 662242688 --- components/google-cloud/RELEASE.md | 1 + .../preview/custom_job/remote_runner.py | 39 ++++++++++++------- .../preview/custom_job/utils.py | 38 +++++++++++------- 3 files changed, 49 insertions(+), 29 deletions(-) diff --git a/components/google-cloud/RELEASE.md b/components/google-cloud/RELEASE.md index af292c0ab47..3dd1e6cd211 100644 --- a/components/google-cloud/RELEASE.md +++ b/components/google-cloud/RELEASE.md @@ -1,5 +1,6 @@ ## Upcoming release * Fix to model batch explanation component for Structured Data pipelines; image bump. +* Add dynamic support for boot_disk_type, boot_disk_size in `preview.custom_job.utils.create_custom_training_job_from_component`. ## Release 2.16.0 * Updated the Starry Net pipeline's template gallery description, and added dataprep_nan_threshold and dataprep_zero_threshold args to the Starry Net pipeline. diff --git a/components/google-cloud/google_cloud_pipeline_components/container/preview/custom_job/remote_runner.py b/components/google-cloud/google_cloud_pipeline_components/container/preview/custom_job/remote_runner.py index e56548c002f..bd490895251 100644 --- a/components/google-cloud/google_cloud_pipeline_components/container/preview/custom_job/remote_runner.py +++ b/components/google-cloud/google_cloud_pipeline_components/container/preview/custom_job/remote_runner.py @@ -32,23 +32,32 @@ def insert_system_labels_into_payload(payload): return json.dumps(job_spec) -def cast_accelerator_count_to_int(payload): - """Casts accelerator_count from string to an int.""" +def is_json(test_string: str) -> bool: + try: + json.loads(test_string) + except ValueError: + return False + return True + + +def parse_nested_json_strings(payload): + """Parse nested json strings in the payload.""" job_spec = json.loads(payload) - # TODO(b/353577594): accelerator_count placeholder is not resolved to int. - # Need to typecast to int to avoid type mismatch error. Can remove when fix - # placeholder resolution. - if ( - 'accelerator_count' - in job_spec['job_spec']['worker_pool_specs'][0]['machine_spec'] + # TODO(b/353577594): Nested placeholder fields inside worker_pool_specs are + # not parsed correctly in backend. Can remove when fix backend logic. + worker_pool_spec = job_spec['job_spec']['worker_pool_specs'][0] + if is_json( + worker_pool_spec.get('machine_spec', {}).get('accelerator_count', '') + ): + worker_pool_spec['machine_spec']['accelerator_count'] = json.loads( + worker_pool_spec['machine_spec']['accelerator_count'] + ) + if is_json( + worker_pool_spec.get('disk_spec', {}).get('boot_disk_size_gb', '') ): - job_spec['job_spec']['worker_pool_specs'][0]['machine_spec'][ - 'accelerator_count' - ] = int( - job_spec['job_spec']['worker_pool_specs'][0]['machine_spec'][ - 'accelerator_count' - ] + worker_pool_spec['disk_spec']['boot_disk_size_gb'] = json.loads( + worker_pool_spec['disk_spec']['boot_disk_size_gb'] ) return json.dumps(job_spec) @@ -107,7 +116,7 @@ def create_custom_job( # Create custom job if it does not exist job_name = remote_runner.check_if_job_exists() if job_name is None: - payload = cast_accelerator_count_to_int(payload) + payload = parse_nested_json_strings(payload) job_name = remote_runner.create_job( create_custom_job_with_client, insert_system_labels_into_payload(payload), diff --git a/components/google-cloud/google_cloud_pipeline_components/preview/custom_job/utils.py b/components/google-cloud/google_cloud_pipeline_components/preview/custom_job/utils.py index ed35a559f55..ec5bdaa7826 100644 --- a/components/google-cloud/google_cloud_pipeline_components/preview/custom_job/utils.py +++ b/components/google-cloud/google_cloud_pipeline_components/preview/custom_job/utils.py @@ -84,8 +84,8 @@ def create_custom_training_job_from_component( machine_type: The type of the machine to run the CustomJob. The default value is "n1-standard-4". See [more information](https://cloud.google.com/vertex-ai/docs/training/configure-compute#machine-types). accelerator_type: The type of accelerator(s) that may be attached to the machine per `accelerator_count`. See [more information](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype). accelerator_count: The number of accelerators to attach to the machine. Defaults to 1 if `accelerator_type` is set statically. - boot_disk_type: Type of the boot disk (default is "pd-ssd"). Valid values: "pd-ssd" (Persistent Disk Solid State Drive) or "pd-standard" (Persistent Disk Hard Disk Drive). boot_disk_type is set as a static value and cannot be changed as a pipeline parameter. - boot_disk_size_gb: Size in GB of the boot disk (default is 100GB). `boot_disk_size_gb` is set as a static value and cannot be changed as a pipeline parameter. + boot_disk_type: Type of the boot disk (default is "pd-ssd"). Valid values: "pd-ssd" (Persistent Disk Solid State Drive) or "pd-standard" (Persistent Disk Hard Disk Drive). + boot_disk_size_gb: Size in GB of the boot disk (default is 100GB). timeout: The maximum job running time. The default is 7 days. A duration in seconds with up to nine fractional digits, terminated by 's', for example: "3.5s". restart_job_on_worker_restart: Restarts the entire CustomJob if a worker gets restarted. This feature can be used by distributed training jobs that are not resilient to workers leaving and joining a job. service_account: Sets the default service account for workload run-as account. The [service account](https://cloud.google.com/vertex-ai/docs/pipelines/configure-project#service-account) running the pipeline submitting jobs must have act-as permission on this run-as account. If unspecified, the Vertex AI Custom Code [Service Agent](https://cloud.google.com/vertex-ai/docs/general/access-control#service-agents) for the CustomJob's project. @@ -94,11 +94,11 @@ def create_custom_training_job_from_component( tensorboard: The name of a Vertex AI TensorBoard resource to which this CustomJob will upload TensorBoard logs. enable_web_access: Whether you want Vertex AI to enable [interactive shell access](https://cloud.google.com/vertex-ai/docs/training/monitor-debug-interactive-shell) to training containers. If `True`, you can access interactive shells at the URIs given by [CustomJob.web_access_uris][]. reserved_ip_ranges: A list of names for the reserved IP ranges under the VPC network that can be used for this job. If set, we will deploy the job within the provided IP ranges. Otherwise, the job will be deployed to any IP ranges under the provided VPC network. - nfs_mounts: A list of [NfsMount](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec#NfsMount) resource specs in Json dict format. For more details about mounting NFS for CustomJob, see [Mount an NFS share for custom training](https://cloud.google.com/vertex-ai/docs/training/train-nfs-share). + nfs_mounts: A list of [NfsMount](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec#NfsMount) resource specs in Json dict format. For more details about mounting NFS for CustomJob, see [Mount an NFS share for custom training](https://cloud.google.com/vertex-ai/docs/training/train-nfs-share). `nfs_mounts` is set as a static value and cannot be changed as a pipeline parameter. base_output_directory: The Cloud Storage location to store the output of this CustomJob or HyperparameterTuningJob. See [more information](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/GcsDestination). labels: The labels with user-defined metadata to organize the CustomJob. See [more information](https://goo.gl/xmQnxf). persistent_resource_id: The ID of the PersistentResource in the same Project and Location which to run. The default value is a placeholder that will be resolved to the PipelineJob [RuntimeConfig](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/projects.locations.pipelineJobs#PipelineJob.RuntimeConfig)'s persistent resource id at runtime. However, if the PipelineJob doesn't set Persistent Resource as the job level runtime, the placedholder will be resolved to an empty string and the custom job will be run on demand. If the value is set explicitly, the custom job will runs in the specified persistent resource, in this case, please note the network and CMEK configs on the job should be consistent with those on the PersistentResource, otherwise, the job will be rejected. (This is a Preview feature not yet recommended for production workloads.) - env: Environment variables to be passed to the container. Takes the form `[{'name': '...', 'value': '...'}]`. Maximum limit is 100. + env: Environment variables to be passed to the container. Takes the form `[{'name': '...', 'value': '...'}]`. Maximum limit is 100. `env` is set as a static value and cannot be changed as a pipeline parameter. Returns: A KFP component with CustomJob specification applied. @@ -164,12 +164,11 @@ def create_custom_training_job_from_component( ), 'env': env or [], }, + 'disk_spec': { + 'boot_disk_type': "{{$.inputs.parameters['boot_disk_type']}}", + 'boot_disk_size_gb': "{{$.inputs.parameters['boot_disk_size_gb']}}", + }, } - if boot_disk_type: - worker_pool_spec['disk_spec'] = { - 'boot_disk_type': boot_disk_type, - 'boot_disk_size_gb': boot_disk_size_gb, - } if nfs_mounts: worker_pool_spec['nfs_mounts'] = nfs_mounts @@ -211,10 +210,7 @@ def create_custom_training_job_from_component( 'defaultValue' ] = default_value - # add machine parameters into the customjob component - if accelerator_type == 'ACCELERATOR_TYPE_UNSPECIFIED': - accelerator_count = 0 - + # add workerPoolSpec parameters into the customjob component cj_component_spec['inputDefinitions']['parameters']['machine_type'] = { 'parameterType': 'STRING', 'defaultValue': machine_type, @@ -227,7 +223,21 @@ def create_custom_training_job_from_component( } cj_component_spec['inputDefinitions']['parameters']['accelerator_count'] = { 'parameterType': 'NUMBER_INTEGER', - 'defaultValue': accelerator_count, + 'defaultValue': ( + accelerator_count + if accelerator_type != 'ACCELERATOR_TYPE_UNSPECIFIED' + else 0 + ), + 'isOptional': True, + } + cj_component_spec['inputDefinitions']['parameters']['boot_disk_type'] = { + 'parameterType': 'STRING', + 'defaultValue': boot_disk_type, + 'isOptional': True, + } + cj_component_spec['inputDefinitions']['parameters']['boot_disk_size_gb'] = { + 'parameterType': 'NUMBER_INTEGER', + 'defaultValue': boot_disk_size_gb, 'isOptional': True, }