Skip to content

Commit

Permalink
feat: support autoscaling in Ray on Vertex
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 668047841
  • Loading branch information
yinghsienwu authored and copybara-github committed Aug 27, 2024
1 parent f334321 commit 961da42
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 19 deletions.
2 changes: 2 additions & 0 deletions google/cloud/aiplatform/vertex_ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from google.cloud.aiplatform.vertex_ray import data

from google.cloud.aiplatform.vertex_ray.util.resources import (
AutoscalingSpec,
Resources,
NodeImages,
PscIConfig,
Expand All @@ -60,6 +61,7 @@
"get_ray_cluster",
"list_ray_clusters",
"update_ray_cluster",
"AutoscalingSpec",
"Resources",
"NodeImages",
"PscIConfig",
Expand Down
37 changes: 31 additions & 6 deletions google/cloud/aiplatform/vertex_ray/cluster_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ def create_ray_cluster(
"[Ray on Vertex AI]: For head_node_type, "
+ "Resources.node_count must be 1."
)
if head_node_type.autoscaling_spec is not None:
raise ValueError(
"[Ray on Vertex AI]: For head_node_type, "
+ "Resources.autoscaling_spec must be None."
)
if (
head_node_type.accelerator_type is None
and head_node_type.accelerator_count > 0
Expand Down Expand Up @@ -225,18 +230,38 @@ def create_ray_cluster(
"[Ray on Vertex]: accelerator_type must be specified when"
+ " accelerator_count is set to a value other than 0."
)
# Worker and head share the same MachineSpec, merge them into the
# same ResourcePool
additional_replica_count = resources._check_machine_spec_identical(
head_node_type, worker_node_type
)
resource_pool_0.replica_count = (
resource_pool_0.replica_count + additional_replica_count
)
if worker_node_type.autoscaling_spec is None:
# Worker and head share the same MachineSpec, merge them into the
# same ResourcePool
resource_pool_0.replica_count = (
resource_pool_0.replica_count + additional_replica_count
)
else:
if additional_replica_count > 0:
# Autoscaling for single ResourcePool (homogeneous cluster).
resource_pool_0.replica_count = None
resource_pool_0.autoscaling_spec.min_replica_count = (
worker_node_type.autoscaling_spec.min_replica_count
)
resource_pool_0.autoscaling_spec.max_replica_count = (
worker_node_type.autoscaling_spec.max_replica_count
)
if additional_replica_count == 0:
resource_pool = ResourcePool()
resource_pool.id = f"worker-pool{i+1}"
resource_pool.replica_count = worker_node_type.node_count
if worker_node_type.autoscaling_spec is None:
resource_pool.replica_count = worker_node_type.node_count
else:
# Autoscaling for worker ResourcePool.
resource_pool.autoscaling_spec.min_replica_count = (
worker_node_type.autoscaling_spec.min_replica_count
)
resource_pool.autoscaling_spec.max_replica_count = (
worker_node_type.autoscaling_spec.max_replica_count
)
resource_pool.machine_spec.machine_type = worker_node_type.machine_type
resource_pool.machine_spec.accelerator_count = (
worker_node_type.accelerator_count
Expand Down
31 changes: 21 additions & 10 deletions google/cloud/aiplatform/vertex_ray/util/_gapic_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)
from google.cloud.aiplatform.vertex_ray.util import _validation_utils
from google.cloud.aiplatform.vertex_ray.util.resources import (
AutoscalingSpec,
Cluster,
PscIConfig,
Resources,
Expand Down Expand Up @@ -253,17 +254,27 @@ def persistent_resource_to_cluster(
if _OFFICIAL_IMAGE in worker_image_uri:
# Official training image is not custom
worker_image_uri = None
worker_node_types.append(
Resources(
machine_type=resource_pools[i + 1].machine_spec.machine_type,
accelerator_type=accelerator_type,
accelerator_count=resource_pools[i + 1].machine_spec.accelerator_count,
boot_disk_type=resource_pools[i + 1].disk_spec.boot_disk_type,
boot_disk_size_gb=resource_pools[i + 1].disk_spec.boot_disk_size_gb,
node_count=resource_pools[i + 1].replica_count,
custom_image=worker_image_uri,
)

resource = Resources(
machine_type=resource_pools[i + 1].machine_spec.machine_type,
accelerator_type=accelerator_type,
accelerator_count=resource_pools[i + 1].machine_spec.accelerator_count,
boot_disk_type=resource_pools[i + 1].disk_spec.boot_disk_type,
boot_disk_size_gb=resource_pools[i + 1].disk_spec.boot_disk_size_gb,
node_count=resource_pools[i + 1].replica_count,
custom_image=worker_image_uri,
)
if resource_pools[i + 1].autoscaling_spec:
resource.autoscaling_spec = AutoscalingSpec(
min_replica_count=resource_pools[
i + 1
].autoscaling_spec.min_replica_count,
max_replica_count=resource_pools[
i + 1
].autoscaling_spec.max_replica_count,
)

worker_node_types.append(resource)

cluster.head_node_type = head_node_type
cluster.worker_node_types = worker_node_types
Expand Down
15 changes: 15 additions & 0 deletions google/cloud/aiplatform/vertex_ray/util/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@
from google.cloud.aiplatform_v1beta1.types import PersistentResource


@dataclasses.dataclass
class AutoscalingSpec:
"""Autoscaling spec for a ray cluster node.
Attributes:
min_replica_count: The minimum number of replicas in the cluster.
max_replica_count: The maximum number of replicas in the cluster.
"""

min_replica_count: int = 1
max_replica_count: int = 2


@dataclasses.dataclass
class Resources:
"""Resources for a ray cluster node.
Expand All @@ -39,6 +52,7 @@ class Resources:
be either unspecified or within the range of [100, 64000].
custom_image: Custom image for this resource (e.g.
us-docker.pkg.dev/my-project/ray-gpu.2-9.py310-tf:latest).
autoscaling_spec: Autoscaling spec for this resource.
"""

machine_type: Optional[str] = "n1-standard-16"
Expand All @@ -48,6 +62,7 @@ class Resources:
boot_disk_type: Optional[str] = "pd-ssd"
boot_disk_size_gb: Optional[int] = 100
custom_image: Optional[str] = None
autoscaling_spec: Optional[AutoscalingSpec] = None


@dataclasses.dataclass
Expand Down
13 changes: 10 additions & 3 deletions tests/unit/vertex_ray/test_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from google.cloud.aiplatform.vertex_ray.util.resources import Cluster
from google.cloud.aiplatform.vertex_ray.util.resources import (
AutoscalingSpec,
PscIConfig,
Resources,
)
Expand Down Expand Up @@ -274,7 +275,7 @@ class ClusterConstants:
TEST_WORKER_NODE_TYPES_2_POOLS = [
Resources(
machine_type="n1-standard-16",
node_count=4,
autoscaling_spec=AutoscalingSpec(min_replica_count=1, max_replica_count=4),
accelerator_type="NVIDIA_TESLA_P100",
accelerator_count=1,
)
Expand All @@ -283,7 +284,7 @@ class ClusterConstants:
TEST_WORKER_NODE_TYPES_2_POOLS_CUSTOM_IMAGE = [
Resources(
machine_type="n1-standard-16",
node_count=4,
autoscaling_spec=AutoscalingSpec(min_replica_count=1, max_replica_count=4),
accelerator_type="NVIDIA_TESLA_P100",
accelerator_count=1,
custom_image=TEST_CUSTOM_IMAGE,
Expand Down Expand Up @@ -311,7 +312,10 @@ class ClusterConstants:
boot_disk_type="pd-ssd",
boot_disk_size_gb=100,
),
replica_count=4,
autoscaling_spec=ResourcePool.AutoscalingSpec(
min_replica_count=1,
max_replica_count=4,
),
)
TEST_REQUEST_RUNNING_2_POOLS = PersistentResource(
resource_pools=[TEST_RESOURCE_POOL_1, TEST_RESOURCE_POOL_2],
Expand Down Expand Up @@ -344,6 +348,8 @@ class ClusterConstants:
psc_interface_config=None,
network=ProjectConstants.TEST_VPC_NETWORK,
)
# Responses
TEST_RESOURCE_POOL_2.replica_count = 1
TEST_RESPONSE_RUNNING_2_POOLS = PersistentResource(
name=TEST_VERTEX_RAY_PR_ADDRESS,
resource_pools=[TEST_RESOURCE_POOL_1, TEST_RESOURCE_POOL_2],
Expand Down Expand Up @@ -425,6 +431,7 @@ class ClusterConstants:
dashboard_address=TEST_VERTEX_RAY_DASHBOARD_ADDRESS,
ray_metric_enabled=True,
ray_logs_enabled=True,
labels={},
)
TEST_CLUSTER_BYOSA = Cluster(
cluster_resource_name=TEST_VERTEX_RAY_PR_ADDRESS,
Expand Down

0 comments on commit 961da42

Please sign in to comment.