From accaa9750d98b7a37b08da3bd2058d9cdd03bd5c Mon Sep 17 00:00:00 2001 From: Amy Wu Date: Thu, 8 Aug 2024 16:11:56 -0700 Subject: [PATCH] feat: support PSC-Interface in Ray on Vertex feat: support disable Cloud logging in Ray on Vertex PiperOrigin-RevId: 661019434 --- .../cloud/aiplatform/vertex_ray/__init__.py | 2 + .../aiplatform/vertex_ray/cluster_init.py | 33 ++++++++-- .../vertex_ray/util/_gapic_utils.py | 17 +++++- .../aiplatform/vertex_ray/util/resources.py | 26 +++++++- tests/unit/vertex_ray/conftest.py | 8 +-- tests/unit/vertex_ray/test_cluster_init.py | 12 +++- tests/unit/vertex_ray/test_constants.py | 61 +++++++++++++++---- 7 files changed, 129 insertions(+), 30 deletions(-) diff --git a/google/cloud/aiplatform/vertex_ray/__init__.py b/google/cloud/aiplatform/vertex_ray/__init__.py index 8e58f0e7da..112ac77ab8 100644 --- a/google/cloud/aiplatform/vertex_ray/__init__.py +++ b/google/cloud/aiplatform/vertex_ray/__init__.py @@ -38,6 +38,7 @@ from google.cloud.aiplatform.vertex_ray.util.resources import ( Resources, NodeImages, + PscIConfig, ) from google.cloud.aiplatform.vertex_ray.dashboard_sdk import ( @@ -61,4 +62,5 @@ "update_ray_cluster", "Resources", "NodeImages", + "PscIConfig", ) diff --git a/google/cloud/aiplatform/vertex_ray/cluster_init.py b/google/cloud/aiplatform/vertex_ray/cluster_init.py index 1894847edc..fd278335e4 100644 --- a/google/cloud/aiplatform/vertex_ray/cluster_init.py +++ b/google/cloud/aiplatform/vertex_ray/cluster_init.py @@ -23,17 +23,20 @@ from google.cloud.aiplatform import initializer from google.cloud.aiplatform import utils from google.cloud.aiplatform.utils import resource_manager_utils -from google.cloud.aiplatform_v1.types import persistent_resource_service +from google.cloud.aiplatform_v1beta1.types import persistent_resource_service -from google.cloud.aiplatform_v1.types.persistent_resource import ( +from google.cloud.aiplatform_v1beta1.types.persistent_resource import ( PersistentResource, + RayLogsSpec, RaySpec, RayMetricSpec, ResourcePool, ResourceRuntimeSpec, ServiceAccountSpec, ) - +from google.cloud.aiplatform_v1beta1.types.service_networking import ( + PscInterfaceConfig, +) from google.cloud.aiplatform.vertex_ray.util import ( _gapic_utils, _validation_utils, @@ -56,6 +59,8 @@ def create_ray_cluster( worker_node_types: Optional[List[resources.Resources]] = [resources.Resources()], custom_images: Optional[resources.NodeImages] = None, enable_metrics_collection: Optional[bool] = True, + enable_logging: Optional[bool] = True, + psc_interface_config: Optional[resources.PscIConfig] = None, labels: Optional[Dict[str, str]] = None, ) -> str: """Create a ray cluster on the Vertex AI. @@ -119,6 +124,8 @@ def create_ray_cluster( head/worker_node_type(s). Note that configuring `Resources.custom_image` will override `custom_images` here. Allowlist only. enable_metrics_collection: Enable Ray metrics collection for visualization. + enable_logging: Enable exporting Ray logs to Cloud Logging. + psc_interface_config: PSC-I config. labels: The labels with user-defined metadata to organize Ray cluster. @@ -258,10 +265,17 @@ def create_ray_cluster( i += 1 resource_pools = [resource_pool_0] + worker_pools - disabled = not enable_metrics_collection - ray_metric_spec = RayMetricSpec(disabled=disabled) + + metrics_collection_disabled = not enable_metrics_collection + ray_metric_spec = RayMetricSpec(disabled=metrics_collection_disabled) + + logging_disabled = not enable_logging + ray_logs_spec = RayLogsSpec(disabled=logging_disabled) + ray_spec = RaySpec( - resource_pool_images=resource_pool_images, ray_metric_spec=ray_metric_spec + resource_pool_images=resource_pool_images, + ray_metric_spec=ray_metric_spec, + ray_logs_spec=ray_logs_spec, ) if service_account: service_account_spec = ServiceAccountSpec( @@ -274,11 +288,18 @@ def create_ray_cluster( ) else: resource_runtime_spec = ResourceRuntimeSpec(ray_spec=ray_spec) + if psc_interface_config: + gapic_psc_interface_config = PscInterfaceConfig( + network_attachment=psc_interface_config.network_attachment, + ) + else: + gapic_psc_interface_config = None persistent_resource = PersistentResource( resource_pools=resource_pools, network=network, labels=labels, resource_runtime_spec=resource_runtime_spec, + psc_interface_config=gapic_psc_interface_config, ) location = initializer.global_config.location diff --git a/google/cloud/aiplatform/vertex_ray/util/_gapic_utils.py b/google/cloud/aiplatform/vertex_ray/util/_gapic_utils.py index bfedef2db3..87f6f824b1 100644 --- a/google/cloud/aiplatform/vertex_ray/util/_gapic_utils.py +++ b/google/cloud/aiplatform/vertex_ray/util/_gapic_utils.py @@ -28,12 +28,13 @@ from google.cloud.aiplatform.vertex_ray.util import _validation_utils from google.cloud.aiplatform.vertex_ray.util.resources import ( Cluster, + PscIConfig, Resources, ) -from google.cloud.aiplatform_v1.types.persistent_resource import ( +from google.cloud.aiplatform_v1beta1.types.persistent_resource import ( PersistentResource, ) -from google.cloud.aiplatform_v1.types.persistent_resource_service import ( +from google.cloud.aiplatform_v1beta1.types.persistent_resource_service import ( GetPersistentResourceRequest, ) @@ -47,7 +48,7 @@ def create_persistent_resource_client(): return initializer.global_config.create_client( client_class=PersistentResourceClientWithOverride, appended_gapic_version="vertex_ray", - ) + ).select_version("v1beta1") def polling_delay(num_attempts: int, time_scale: float) -> datetime.timedelta: @@ -159,6 +160,10 @@ def persistent_resource_to_cluster( % persistent_resource.name, ) return + if persistent_resource.psc_interface_config: + cluster.psc_interface_config = PscIConfig( + network_attachment=persistent_resource.psc_interface_config.network_attachment + ) resource_pools = persistent_resource.resource_pools head_resource_pool = resource_pools[0] @@ -192,6 +197,12 @@ def persistent_resource_to_cluster( ray_version = None cluster.python_version = python_version cluster.ray_version = ray_version + cluster.ray_metric_enabled = not ( + persistent_resource.resource_runtime_spec.ray_spec.ray_metric_spec.disabled + ) + cluster.ray_logs_enabled = not ( + persistent_resource.resource_runtime_spec.ray_spec.ray_logs_spec.disabled + ) accelerator_type = head_resource_pool.machine_spec.accelerator_type if accelerator_type.value != 0: diff --git a/google/cloud/aiplatform/vertex_ray/util/resources.py b/google/cloud/aiplatform/vertex_ray/util/resources.py index 28f28f68fd..3e865f34e3 100644 --- a/google/cloud/aiplatform/vertex_ray/util/resources.py +++ b/google/cloud/aiplatform/vertex_ray/util/resources.py @@ -16,7 +16,7 @@ # import dataclasses from typing import Dict, List, Optional -from google.cloud.aiplatform_v1.types import PersistentResource +from google.cloud.aiplatform_v1beta1.types import PersistentResource @dataclasses.dataclass @@ -68,6 +68,27 @@ class NodeImages: worker: str = None +@dataclasses.dataclass +class PscIConfig: + """PSC-I config. + + Attributes: + network_attachment: Optional. The name or full name of the Compute Engine + `network attachment ` + to attach to the resource. It has a format: + ``projects/{project}/regions/{region}/networkAttachments/{networkAttachment}``. + Where {project} is a project number, as in ``12345``, and + {networkAttachment} is a network attachment name. To specify + this field, you must have already [created a network + attachment] + (https://cloud.google.com/vpc/docs/create-manage-network-attachments#create-network-attachments). + This field is only used for resources using PSC-I. Make sure you do not + specify the network here for VPC peering. + """ + + network_attachment: str = None + + @dataclasses.dataclass class Cluster: """Ray cluster (output only). @@ -111,6 +132,9 @@ class Cluster: head_node_type: Resources = None worker_node_types: List[Resources] = None dashboard_address: str = None + ray_metric_enabled: bool = True + ray_logs_enabled: bool = True + psc_interface_config: PscIConfig = None labels: Dict[str, str] = None diff --git a/tests/unit/vertex_ray/conftest.py b/tests/unit/vertex_ray/conftest.py index de20c135e0..9bebe10e1f 100644 --- a/tests/unit/vertex_ray/conftest.py +++ b/tests/unit/vertex_ray/conftest.py @@ -19,16 +19,16 @@ from google.auth import credentials as auth_credentials from google.cloud import resourcemanager from google.cloud.aiplatform import vertex_ray -from google.cloud.aiplatform_v1.services.persistent_resource_service import ( +from google.cloud.aiplatform_v1beta1.services.persistent_resource_service import ( PersistentResourceServiceClient, ) -from google.cloud.aiplatform_v1.types.persistent_resource import ( +from google.cloud.aiplatform_v1beta1.types.persistent_resource import ( PersistentResource, ) -from google.cloud.aiplatform_v1.types.persistent_resource import ( +from google.cloud.aiplatform_v1beta1.types.persistent_resource import ( ResourceRuntime, ) -from google.cloud.aiplatform_v1.types.persistent_resource_service import ( +from google.cloud.aiplatform_v1beta1.types.persistent_resource_service import ( DeletePersistentResourceRequest, ) import test_constants as tc diff --git a/tests/unit/vertex_ray/test_cluster_init.py b/tests/unit/vertex_ray/test_cluster_init.py index fcb4fa5b6e..a4ddd5f818 100644 --- a/tests/unit/vertex_ray/test_cluster_init.py +++ b/tests/unit/vertex_ray/test_cluster_init.py @@ -22,10 +22,10 @@ Resources, NodeImages, ) -from google.cloud.aiplatform_v1.services.persistent_resource_service import ( +from google.cloud.aiplatform_v1beta1.services.persistent_resource_service import ( PersistentResourceServiceClient, ) -from google.cloud.aiplatform_v1.types import persistent_resource_service +from google.cloud.aiplatform_v1beta1.types import persistent_resource_service import test_constants as tc import mock import pytest @@ -352,6 +352,7 @@ def test_create_ray_cluster_1_pool_gpu_with_labels_success( self, create_persistent_resource_1_pool_mock ): """If head and worker nodes are duplicate, merge to head pool.""" + # Also test disable logging and metrics collection. cluster_name = vertex_ray.create_ray_cluster( head_node_type=tc.ClusterConstants.TEST_HEAD_NODE_TYPE_1_POOL, worker_node_types=tc.ClusterConstants.TEST_WORKER_NODE_TYPES_1_POOL, @@ -359,6 +360,7 @@ def test_create_ray_cluster_1_pool_gpu_with_labels_success( cluster_name=tc.ClusterConstants.TEST_VERTEX_RAY_PR_ID, labels=tc.ClusterConstants.TEST_LABELS, enable_metrics_collection=False, + enable_logging=False, ) assert tc.ClusterConstants.TEST_VERTEX_RAY_PR_ADDRESS == cluster_name @@ -401,11 +403,15 @@ def test_create_ray_cluster_2_pools_success( self, create_persistent_resource_2_pools_mock ): """If head and worker nodes are not duplicate, create separate resource_pools.""" + # Also test PSC-I. + psc_interface_config = vertex_ray.PscIConfig( + network_attachment=tc.ClusterConstants.TEST_PSC_NETWORK_ATTACHMENT + ) cluster_name = vertex_ray.create_ray_cluster( head_node_type=tc.ClusterConstants.TEST_HEAD_NODE_TYPE_2_POOLS, worker_node_types=tc.ClusterConstants.TEST_WORKER_NODE_TYPES_2_POOLS, - network=tc.ProjectConstants.TEST_VPC_NETWORK, cluster_name=tc.ClusterConstants.TEST_VERTEX_RAY_PR_ID, + psc_interface_config=psc_interface_config, ) assert tc.ClusterConstants.TEST_VERTEX_RAY_PR_ADDRESS == cluster_name diff --git a/tests/unit/vertex_ray/test_constants.py b/tests/unit/vertex_ray/test_constants.py index 8326b81595..1f1755b1c5 100644 --- a/tests/unit/vertex_ray/test_constants.py +++ b/tests/unit/vertex_ray/test_constants.py @@ -20,31 +20,36 @@ from google.cloud.aiplatform.vertex_ray.util.resources import Cluster from google.cloud.aiplatform.vertex_ray.util.resources import ( + PscIConfig, Resources, ) -from google.cloud.aiplatform_v1.types.machine_resources import DiskSpec -from google.cloud.aiplatform_v1.types.machine_resources import ( +from google.cloud.aiplatform_v1beta1.types.machine_resources import DiskSpec +from google.cloud.aiplatform_v1beta1.types.machine_resources import ( MachineSpec, ) -from google.cloud.aiplatform_v1.types.persistent_resource import ( +from google.cloud.aiplatform_v1beta1.types.persistent_resource import ( PersistentResource, ) -from google.cloud.aiplatform_v1.types.persistent_resource import ( +from google.cloud.aiplatform_v1beta1.types.persistent_resource import ( + RayLogsSpec, RayMetricSpec, ) -from google.cloud.aiplatform_v1.types.persistent_resource import RaySpec -from google.cloud.aiplatform_v1.types.persistent_resource import ( +from google.cloud.aiplatform_v1beta1.types.persistent_resource import RaySpec +from google.cloud.aiplatform_v1beta1.types.persistent_resource import ( ResourcePool, ) -from google.cloud.aiplatform_v1.types.persistent_resource import ( +from google.cloud.aiplatform_v1beta1.types.persistent_resource import ( ResourceRuntime, ) -from google.cloud.aiplatform_v1.types.persistent_resource import ( +from google.cloud.aiplatform_v1beta1.types.persistent_resource import ( ResourceRuntimeSpec, ) -from google.cloud.aiplatform_v1.types.persistent_resource import ( +from google.cloud.aiplatform_v1beta1.types.persistent_resource import ( ServiceAccountSpec, ) +from google.cloud.aiplatform_v1beta1.types.service_networking import ( + PscInterfaceConfig, +) import pytest @@ -93,6 +98,7 @@ class ClusterConstants: TEST_CPU_IMAGE = "us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest" TEST_GPU_IMAGE = "us-docker.pkg.dev/vertex-ai/training/ray-gpu.2-9.py310:latest" TEST_CUSTOM_IMAGE = "us-docker.pkg.dev/my-project/ray-custom-image.2.9:latest" + TEST_PSC_NETWORK_ATTACHMENT = "my-network-attachment" # RUNNING Persistent Cluster w/o Ray TEST_RESPONSE_NO_RAY_RUNNING = PersistentResource( name=TEST_VERTEX_RAY_PR_ADDRESS, @@ -129,8 +135,10 @@ class ClusterConstants: ray_spec=RaySpec( resource_pool_images={"head-node": TEST_GPU_IMAGE}, ray_metric_spec=RayMetricSpec(disabled=False), + ray_logs_spec=RayLogsSpec(disabled=False), ), ), + psc_interface_config=None, network=ProjectConstants.TEST_VPC_NETWORK, ) TEST_REQUEST_RUNNING_1_POOL_WITH_LABELS = PersistentResource( @@ -139,8 +147,10 @@ class ClusterConstants: ray_spec=RaySpec( resource_pool_images={"head-node": TEST_GPU_IMAGE}, ray_metric_spec=RayMetricSpec(disabled=True), + ray_logs_spec=RayLogsSpec(disabled=True), ), ), + psc_interface_config=None, network=ProjectConstants.TEST_VPC_NETWORK, labels=TEST_LABELS, ) @@ -150,8 +160,10 @@ class ClusterConstants: ray_spec=RaySpec( resource_pool_images={"head-node": TEST_CUSTOM_IMAGE}, ray_metric_spec=RayMetricSpec(disabled=False), + ray_logs_spec=RayLogsSpec(disabled=False), ), ), + psc_interface_config=None, network=ProjectConstants.TEST_VPC_NETWORK, ) TEST_REQUEST_RUNNING_1_POOL_BYOSA = PersistentResource( @@ -160,12 +172,14 @@ class ClusterConstants: ray_spec=RaySpec( resource_pool_images={"head-node": TEST_GPU_IMAGE}, ray_metric_spec=RayMetricSpec(disabled=False), + ray_logs_spec=RayLogsSpec(disabled=False), ), service_account_spec=ServiceAccountSpec( enable_custom_service_account=True, service_account=ProjectConstants.TEST_SERVICE_ACCOUNT, ), ), + psc_interface_config=None, network=None, ) # Get response has generated name, and URIs @@ -176,8 +190,10 @@ class ClusterConstants: ray_spec=RaySpec( resource_pool_images={"head-node": TEST_GPU_IMAGE}, ray_metric_spec=RayMetricSpec(disabled=False), + ray_logs_spec=RayLogsSpec(disabled=False), ), ), + psc_interface_config=None, network=ProjectConstants.TEST_VPC_NETWORK, resource_runtime=ResourceRuntime( access_uris={ @@ -197,6 +213,7 @@ class ClusterConstants: ray_metric_spec=RayMetricSpec(disabled=False), ), ), + psc_interface_config=None, network=ProjectConstants.TEST_VPC_NETWORK, resource_runtime=ResourceRuntime( access_uris={ @@ -219,6 +236,7 @@ class ClusterConstants: service_account=ProjectConstants.TEST_SERVICE_ACCOUNT, ), ), + psc_interface_config=None, network=None, resource_runtime=ResourceRuntime( access_uris={ @@ -241,6 +259,7 @@ class ClusterConstants: service_account=ProjectConstants.TEST_SERVICE_ACCOUNT, ), ), + psc_interface_config=None, network=ProjectConstants.TEST_VPC_NETWORK, resource_runtime=ResourceRuntime( access_uris={ @@ -303,9 +322,12 @@ class ClusterConstants: "worker-pool1": TEST_GPU_IMAGE, }, ray_metric_spec=RayMetricSpec(disabled=False), + ray_logs_spec=RayLogsSpec(disabled=False), ), ), - network=ProjectConstants.TEST_VPC_NETWORK, + psc_interface_config=PscInterfaceConfig( + network_attachment=TEST_PSC_NETWORK_ATTACHMENT + ), ) TEST_REQUEST_RUNNING_2_POOLS_CUSTOM_IMAGE = PersistentResource( resource_pools=[TEST_RESOURCE_POOL_1, TEST_RESOURCE_POOL_2], @@ -316,8 +338,10 @@ class ClusterConstants: "worker-pool1": TEST_CUSTOM_IMAGE, }, ray_metric_spec=RayMetricSpec(disabled=False), + ray_logs_spec=RayLogsSpec(disabled=False), ), ), + psc_interface_config=None, network=ProjectConstants.TEST_VPC_NETWORK, ) TEST_RESPONSE_RUNNING_2_POOLS = PersistentResource( @@ -332,11 +356,13 @@ class ClusterConstants: ray_metric_spec=RayMetricSpec(disabled=False), ), ), - network=ProjectConstants.TEST_VPC_NETWORK, + psc_interface_config=PscInterfaceConfig( + network_attachment=TEST_PSC_NETWORK_ATTACHMENT + ), + network=None, resource_runtime=ResourceRuntime( access_uris={ "RAY_DASHBOARD_URI": TEST_VERTEX_RAY_DASHBOARD_ADDRESS, - "RAY_HEAD_NODE_INTERNAL_IP": TEST_VERTEX_RAY_HEAD_NODE_IP, } ), state="RUNNING", @@ -372,17 +398,22 @@ class ClusterConstants: head_node_type=TEST_HEAD_NODE_TYPE_1_POOL, worker_node_types=TEST_WORKER_NODE_TYPES_1_POOL, dashboard_address=TEST_VERTEX_RAY_DASHBOARD_ADDRESS, + ray_metric_enabled=True, + ray_logs_enabled=True, ) TEST_CLUSTER_2 = Cluster( cluster_resource_name=TEST_VERTEX_RAY_PR_ADDRESS, python_version="3.10", ray_version="2.9", - network=ProjectConstants.TEST_VPC_NETWORK, + network="", service_account=None, state="RUNNING", head_node_type=TEST_HEAD_NODE_TYPE_2_POOLS, worker_node_types=TEST_WORKER_NODE_TYPES_2_POOLS, dashboard_address=TEST_VERTEX_RAY_DASHBOARD_ADDRESS, + ray_metric_enabled=True, + ray_logs_enabled=True, + psc_interface_config=PscIConfig(network_attachment=TEST_PSC_NETWORK_ATTACHMENT), ) TEST_CLUSTER_CUSTOM_IMAGE = Cluster( cluster_resource_name=TEST_VERTEX_RAY_PR_ADDRESS, @@ -392,6 +423,8 @@ class ClusterConstants: head_node_type=TEST_HEAD_NODE_TYPE_2_POOLS_CUSTOM_IMAGE, worker_node_types=TEST_WORKER_NODE_TYPES_2_POOLS_CUSTOM_IMAGE, dashboard_address=TEST_VERTEX_RAY_DASHBOARD_ADDRESS, + ray_metric_enabled=True, + ray_logs_enabled=True, ) TEST_CLUSTER_BYOSA = Cluster( cluster_resource_name=TEST_VERTEX_RAY_PR_ADDRESS, @@ -403,6 +436,8 @@ class ClusterConstants: head_node_type=TEST_HEAD_NODE_TYPE_1_POOL, worker_node_types=TEST_WORKER_NODE_TYPES_1_POOL, dashboard_address=TEST_VERTEX_RAY_DASHBOARD_ADDRESS, + ray_metric_enabled=True, + ray_logs_enabled=True, ) TEST_BEARER_TOKEN = "test-bearer-token" TEST_HEADERS = {