Skip to content

Commit

Permalink
feat(components): Support dynamic values for boot_disk_type, boot_dis…
Browse files Browse the repository at this point in the history
…k_size in preview.custom_job.utils.create_custom_training_job_from_component

Signed-off-by: Googler <nobody@google.com>
PiperOrigin-RevId: 662242688
  • Loading branch information
Googler committed Aug 12, 2024
1 parent 289f64f commit 7b7918e
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 29 deletions.
1 change: 1 addition & 0 deletions components/google-cloud/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Upcoming release
* Fix to model batch explanation component for Structured Data pipelines; image bump.
* Add dynamic support for boot_disk_type, boot_disk_size in `preview.custom_job.utils.create_custom_training_job_from_component`.

## Release 2.16.0
* Updated the Starry Net pipeline's template gallery description, and added dataprep_nan_threshold and dataprep_zero_threshold args to the Starry Net pipeline.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,32 @@ def insert_system_labels_into_payload(payload):
return json.dumps(job_spec)


def cast_accelerator_count_to_int(payload):
"""Casts accelerator_count from string to an int."""
def is_json(test_string: str) -> bool:
try:
json.loads(test_string)
except ValueError:
return False
return True


def parse_nested_json_strings(payload):
"""Parse nested json strings in the payload."""

job_spec = json.loads(payload)
# TODO(b/353577594): accelerator_count placeholder is not resolved to int.
# Need to typecast to int to avoid type mismatch error. Can remove when fix
# placeholder resolution.
if (
'accelerator_count'
in job_spec['job_spec']['worker_pool_specs'][0]['machine_spec']
# TODO(b/353577594): Nested placeholder fields inside worker_pool_specs are
# not parsed correctly in backend. Can remove when fix backend logic.
worker_pool_spec = job_spec['job_spec']['worker_pool_specs'][0]
if is_json(
worker_pool_spec.get('machine_spec', {}).get('accelerator_count', '')
):
worker_pool_spec['machine_spec']['accelerator_count'] = json.loads(
worker_pool_spec['machine_spec']['accelerator_count']
)
if is_json(
worker_pool_spec.get('disk_spec', {}).get('boot_disk_size_gb', '')
):
job_spec['job_spec']['worker_pool_specs'][0]['machine_spec'][
'accelerator_count'
] = int(
job_spec['job_spec']['worker_pool_specs'][0]['machine_spec'][
'accelerator_count'
]
worker_pool_spec['disk_spec']['boot_disk_size_gb'] = json.loads(
worker_pool_spec['disk_spec']['boot_disk_size_gb']
)
return json.dumps(job_spec)

Expand Down Expand Up @@ -107,7 +116,7 @@ def create_custom_job(
# Create custom job if it does not exist
job_name = remote_runner.check_if_job_exists()
if job_name is None:
payload = cast_accelerator_count_to_int(payload)
payload = parse_nested_json_strings(payload)
job_name = remote_runner.create_job(
create_custom_job_with_client,
insert_system_labels_into_payload(payload),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ def create_custom_training_job_from_component(
machine_type: The type of the machine to run the CustomJob. The default value is "n1-standard-4". See [more information](https://cloud.google.com/vertex-ai/docs/training/configure-compute#machine-types).
accelerator_type: The type of accelerator(s) that may be attached to the machine per `accelerator_count`. See [more information](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype).
accelerator_count: The number of accelerators to attach to the machine. Defaults to 1 if `accelerator_type` is set statically.
boot_disk_type: Type of the boot disk (default is "pd-ssd"). Valid values: "pd-ssd" (Persistent Disk Solid State Drive) or "pd-standard" (Persistent Disk Hard Disk Drive). boot_disk_type is set as a static value and cannot be changed as a pipeline parameter.
boot_disk_size_gb: Size in GB of the boot disk (default is 100GB). `boot_disk_size_gb` is set as a static value and cannot be changed as a pipeline parameter.
boot_disk_type: Type of the boot disk (default is "pd-ssd"). Valid values: "pd-ssd" (Persistent Disk Solid State Drive) or "pd-standard" (Persistent Disk Hard Disk Drive).
boot_disk_size_gb: Size in GB of the boot disk (default is 100GB).
timeout: The maximum job running time. The default is 7 days. A duration in seconds with up to nine fractional digits, terminated by 's', for example: "3.5s".
restart_job_on_worker_restart: Restarts the entire CustomJob if a worker gets restarted. This feature can be used by distributed training jobs that are not resilient to workers leaving and joining a job.
service_account: Sets the default service account for workload run-as account. The [service account](https://cloud.google.com/vertex-ai/docs/pipelines/configure-project#service-account) running the pipeline submitting jobs must have act-as permission on this run-as account. If unspecified, the Vertex AI Custom Code [Service Agent](https://cloud.google.com/vertex-ai/docs/general/access-control#service-agents) for the CustomJob's project.
Expand All @@ -94,11 +94,11 @@ def create_custom_training_job_from_component(
tensorboard: The name of a Vertex AI TensorBoard resource to which this CustomJob will upload TensorBoard logs.
enable_web_access: Whether you want Vertex AI to enable [interactive shell access](https://cloud.google.com/vertex-ai/docs/training/monitor-debug-interactive-shell) to training containers. If `True`, you can access interactive shells at the URIs given by [CustomJob.web_access_uris][].
reserved_ip_ranges: A list of names for the reserved IP ranges under the VPC network that can be used for this job. If set, we will deploy the job within the provided IP ranges. Otherwise, the job will be deployed to any IP ranges under the provided VPC network.
nfs_mounts: A list of [NfsMount](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec#NfsMount) resource specs in Json dict format. For more details about mounting NFS for CustomJob, see [Mount an NFS share for custom training](https://cloud.google.com/vertex-ai/docs/training/train-nfs-share).
nfs_mounts: A list of [NfsMount](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec#NfsMount) resource specs in Json dict format. For more details about mounting NFS for CustomJob, see [Mount an NFS share for custom training](https://cloud.google.com/vertex-ai/docs/training/train-nfs-share). `nfs_mounts` is set as a static value and cannot be changed as a pipeline parameter.
base_output_directory: The Cloud Storage location to store the output of this CustomJob or HyperparameterTuningJob. See [more information](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/GcsDestination).
labels: The labels with user-defined metadata to organize the CustomJob. See [more information](https://goo.gl/xmQnxf).
persistent_resource_id: The ID of the PersistentResource in the same Project and Location which to run. The default value is a placeholder that will be resolved to the PipelineJob [RuntimeConfig](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/projects.locations.pipelineJobs#PipelineJob.RuntimeConfig)'s persistent resource id at runtime. However, if the PipelineJob doesn't set Persistent Resource as the job level runtime, the placedholder will be resolved to an empty string and the custom job will be run on demand. If the value is set explicitly, the custom job will runs in the specified persistent resource, in this case, please note the network and CMEK configs on the job should be consistent with those on the PersistentResource, otherwise, the job will be rejected. (This is a Preview feature not yet recommended for production workloads.)
env: Environment variables to be passed to the container. Takes the form `[{'name': '...', 'value': '...'}]`. Maximum limit is 100.
env: Environment variables to be passed to the container. Takes the form `[{'name': '...', 'value': '...'}]`. Maximum limit is 100. `env` is set as a static value and cannot be changed as a pipeline parameter.
Returns:
A KFP component with CustomJob specification applied.
Expand Down Expand Up @@ -164,12 +164,11 @@ def create_custom_training_job_from_component(
),
'env': env or [],
},
'disk_spec': {
'boot_disk_type': "{{$.inputs.parameters['boot_disk_type']}}",
'boot_disk_size_gb': "{{$.inputs.parameters['boot_disk_size_gb']}}",
},
}
if boot_disk_type:
worker_pool_spec['disk_spec'] = {
'boot_disk_type': boot_disk_type,
'boot_disk_size_gb': boot_disk_size_gb,
}
if nfs_mounts:
worker_pool_spec['nfs_mounts'] = nfs_mounts

Expand Down Expand Up @@ -211,10 +210,7 @@ def create_custom_training_job_from_component(
'defaultValue'
] = default_value

# add machine parameters into the customjob component
if accelerator_type == 'ACCELERATOR_TYPE_UNSPECIFIED':
accelerator_count = 0

# add workerPoolSpec parameters into the customjob component
cj_component_spec['inputDefinitions']['parameters']['machine_type'] = {
'parameterType': 'STRING',
'defaultValue': machine_type,
Expand All @@ -227,7 +223,21 @@ def create_custom_training_job_from_component(
}
cj_component_spec['inputDefinitions']['parameters']['accelerator_count'] = {
'parameterType': 'NUMBER_INTEGER',
'defaultValue': accelerator_count,
'defaultValue': (
accelerator_count
if accelerator_type != 'ACCELERATOR_TYPE_UNSPECIFIED'
else 0
),
'isOptional': True,
}
cj_component_spec['inputDefinitions']['parameters']['boot_disk_type'] = {
'parameterType': 'STRING',
'defaultValue': boot_disk_type,
'isOptional': True,
}
cj_component_spec['inputDefinitions']['parameters']['boot_disk_size_gb'] = {
'parameterType': 'NUMBER_INTEGER',
'defaultValue': boot_disk_size_gb,
'isOptional': True,
}

Expand Down

0 comments on commit 7b7918e

Please sign in to comment.