From 7b7d7d2f2c7f074e62ee009a308341228fcd6582 Mon Sep 17 00:00:00 2001 From: A Vertex SDK engineer Date: Wed, 29 Nov 2023 14:20:56 -0800 Subject: [PATCH] feat: add additional parameters to Model.upload(). PiperOrigin-RevId: 586452396 --- google/cloud/aiplatform/models.py | 75 ++++++++++++++++++ .../aiplatform/prediction/local_model.py | 76 +++++++++++++++++++ .../system/aiplatform/test_prediction_cpr.py | 2 + tests/unit/aiplatform/test_models.py | 47 +++++++++++- 4 files changed, 199 insertions(+), 1 deletion(-) diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index 282fc45bb7..8a37a4c535 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -36,6 +36,7 @@ from google.api_core import exceptions as api_exceptions from google.auth import credentials as auth_credentials from google.auth.transport import requests as google_auth_requests +from google.protobuf import duration_pb2 import proto from google.cloud import aiplatform @@ -2974,6 +2975,14 @@ def upload( staging_bucket: Optional[str] = None, sync=True, upload_request_timeout: Optional[float] = None, + serving_container_deployment_timeout: Optional[int] = None, + serving_container_shared_memory_size_mb: Optional[int] = None, + serving_container_startup_probe_exec: Optional[Sequence[str]] = None, + serving_container_startup_probe_period_seconds: Optional[int] = None, + serving_container_startup_probe_timeout_seconds: Optional[int] = None, + serving_container_health_probe_exec: Optional[Sequence[str]] = None, + serving_container_health_probe_period_seconds: Optional[int] = None, + serving_container_health_probe_timeout_seconds: Optional[int] = None, ) -> "Model": """Uploads a model and returns a Model representing the uploaded Model resource. @@ -3153,6 +3162,31 @@ def upload( staging_bucket set in aiplatform.init. upload_request_timeout (float): Optional. The timeout for the upload request in seconds. + serving_container_deployment_timeout (int): + Optional. Deployment timeout in seconds. + serving_container_shared_memory_size_mb (int): + Optional. The amount of the VM memory to reserve as the shared + memory for the model in megabytes. + serving_container_startup_probe_exec (Sequence[str]): + Optional. Exec specifies the action to take. Used by startup + probe. An example of this argument would be + ["cat", "/tmp/healthy"] + serving_container_startup_probe_period_seconds (int): + Optional. How often (in seconds) to perform the startup probe. + Default to 10 seconds. Minimum value is 1. + serving_container_startup_probe_timeout_seconds (int): + Optional. Number of seconds after which the startup probe times + out. Defaults to 1 second. Minimum value is 1. + serving_container_health_probe_exec (Sequence[str]): + Optional. Exec specifies the action to take. Used by health + probe. An example of this argument would be + ["cat", "/tmp/healthy"] + serving_container_health_probe_period_seconds (int): + Optional. How often (in seconds) to perform the health probe. + Default to 10 seconds. Minimum value is 1. + serving_container_health_probe_timeout_seconds (int): + Optional. Number of seconds after which the health probe times + out. Defaults to 1 second. Minimum value is 1. Returns: model (aiplatform.Model): @@ -3187,6 +3221,13 @@ def upload( env = None ports = None + deployment_timeout = ( + duration_pb2.Duration(seconds=serving_container_deployment_timeout) + if serving_container_deployment_timeout + else None + ) + startup_probe = None + health_probe = None if serving_container_environment_variables: env = [ @@ -3198,6 +3239,36 @@ def upload( gca_model_compat.Port(container_port=port) for port in serving_container_ports ] + if ( + serving_container_startup_probe_exec + or serving_container_startup_probe_period_seconds + or serving_container_startup_probe_timeout_seconds + ): + startup_probe_exec = None + if serving_container_startup_probe_exec: + startup_probe_exec = gca_model_compat.Probe.ExecAction( + command=serving_container_startup_probe_exec + ) + startup_probe = gca_model_compat.Probe( + exec=startup_probe_exec, + period_seconds=serving_container_startup_probe_period_seconds, + timeout_seconds=serving_container_startup_probe_timeout_seconds, + ) + if ( + serving_container_health_probe_exec + or serving_container_health_probe_period_seconds + or serving_container_health_probe_timeout_seconds + ): + health_probe_exec = None + if serving_container_health_probe_exec: + health_probe_exec = gca_model_compat.Probe.ExecAction( + command=serving_container_health_probe_exec + ) + health_probe = gca_model_compat.Probe( + exec=health_probe_exec, + period_seconds=serving_container_health_probe_period_seconds, + timeout_seconds=serving_container_health_probe_timeout_seconds, + ) container_spec = gca_model_compat.ModelContainerSpec( image_uri=serving_container_image_uri, @@ -3207,6 +3278,10 @@ def upload( ports=ports, predict_route=serving_container_predict_route, health_route=serving_container_health_route, + deployment_timeout=deployment_timeout, + shared_memory_size_mb=serving_container_shared_memory_size_mb, + startup_probe=startup_probe, + health_probe=health_probe, ) model_predict_schemata = None diff --git a/google/cloud/aiplatform/prediction/local_model.py b/google/cloud/aiplatform/prediction/local_model.py index c44cd258ae..20b52527e3 100644 --- a/google/cloud/aiplatform/prediction/local_model.py +++ b/google/cloud/aiplatform/prediction/local_model.py @@ -36,6 +36,8 @@ from google.cloud.aiplatform.prediction.predictor import Predictor from google.cloud.aiplatform.utils import prediction_utils +from google.protobuf import duration_pb2 + DEFAULT_PREDICT_ROUTE = "/predict" DEFAULT_HEALTH_ROUTE = "/health" DEFAULT_HTTP_PORT = 8080 @@ -58,6 +60,14 @@ def __init__( serving_container_args: Optional[Sequence[str]] = None, serving_container_environment_variables: Optional[Dict[str, str]] = None, serving_container_ports: Optional[Sequence[int]] = None, + serving_container_deployment_timeout: Optional[int] = None, + serving_container_shared_memory_size_mb: Optional[int] = None, + serving_container_startup_probe_exec: Optional[Sequence[str]] = None, + serving_container_startup_probe_period_seconds: Optional[int] = None, + serving_container_startup_probe_timeout_seconds: Optional[int] = None, + serving_container_health_probe_exec: Optional[Sequence[str]] = None, + serving_container_health_probe_period_seconds: Optional[int] = None, + serving_container_health_probe_timeout_seconds: Optional[int] = None, ): """Creates a local model instance. @@ -100,6 +110,31 @@ def __init__( no impact on whether the port is actually exposed, any port listening on the default "0.0.0.0" address inside a container will be accessible from the network. + serving_container_deployment_timeout (int): + Optional. Deployment timeout in seconds. + serving_container_shared_memory_size_mb (int): + Optional. The amount of the VM memory to reserve as the shared + memory for the model in megabytes. + serving_container_startup_probe_exec (Sequence[str]): + Optional. Exec specifies the action to take. Used by startup + probe. An example of this argument would be + ["cat", "/tmp/healthy"] + serving_container_startup_probe_period_seconds (int): + Optional. How often (in seconds) to perform the startup probe. + Default to 10 seconds. Minimum value is 1. + serving_container_startup_probe_timeout_seconds (int): + Optional. Number of seconds after which the startup probe times + out. Defaults to 1 second. Minimum value is 1. + serving_container_health_probe_exec (Sequence[str]): + Optional. Exec specifies the action to take. Used by health + probe. An example of this argument would be + ["cat", "/tmp/healthy"] + serving_container_health_probe_period_seconds (int): + Optional. How often (in seconds) to perform the health probe. + Default to 10 seconds. Minimum value is 1. + serving_container_health_probe_timeout_seconds (int): + Optional. Number of seconds after which the health probe times + out. Defaults to 1 second. Minimum value is 1. Raises: ValueError: If ``serving_container_spec`` is specified but ``serving_container_spec.image_uri`` @@ -121,6 +156,13 @@ def __init__( env = None ports = None + deployment_timeout = ( + duration_pb2.Duration(seconds=serving_container_deployment_timeout) + if serving_container_deployment_timeout + else None + ) + startup_probe = None + health_probe = None if serving_container_environment_variables: env = [ @@ -132,6 +174,36 @@ def __init__( gca_model_compat.Port(container_port=port) for port in serving_container_ports ] + if ( + serving_container_startup_probe_exec + or serving_container_startup_probe_period_seconds + or serving_container_startup_probe_timeout_seconds + ): + startup_probe_exec = None + if serving_container_startup_probe_exec: + startup_probe_exec = gca_model_compat.Probe.ExecAction( + command=serving_container_startup_probe_exec + ) + startup_probe = gca_model_compat.Probe( + exec=startup_probe_exec, + period_seconds=serving_container_startup_probe_period_seconds, + timeout_seconds=serving_container_startup_probe_timeout_seconds, + ) + if ( + serving_container_health_probe_exec + or serving_container_health_probe_period_seconds + or serving_container_health_probe_timeout_seconds + ): + health_probe_exec = None + if serving_container_health_probe_exec: + health_probe_exec = gca_model_compat.Probe.ExecAction( + command=serving_container_health_probe_exec + ) + health_probe = gca_model_compat.Probe( + exec=health_probe_exec, + period_seconds=serving_container_health_probe_period_seconds, + timeout_seconds=serving_container_health_probe_timeout_seconds, + ) self.serving_container_spec = gca_model_compat.ModelContainerSpec( image_uri=serving_container_image_uri, @@ -141,6 +213,10 @@ def __init__( ports=ports, predict_route=serving_container_predict_route, health_route=serving_container_health_route, + deployment_timeout=deployment_timeout, + shared_memory_size_mb=serving_container_shared_memory_size_mb, + startup_probe=startup_probe, + health_probe=health_probe, ) @classmethod diff --git a/tests/system/aiplatform/test_prediction_cpr.py b/tests/system/aiplatform/test_prediction_cpr.py index 3095206766..9f12e939af 100644 --- a/tests/system/aiplatform/test_prediction_cpr.py +++ b/tests/system/aiplatform/test_prediction_cpr.py @@ -97,6 +97,8 @@ def test_build_cpr_model_upload_and_deploy(self, shared_state, caplog): local_model=local_model, display_name=f"cpr_e2e_test_{_TIMESTAMP}", artifact_uri=_ARTIFACT_URI, + serving_container_deployment_timeout=3600, + serving_container_shared_memory_size_mb=20, ) shared_state["resources"] = [model] diff --git a/tests/unit/aiplatform/test_models.py b/tests/unit/aiplatform/test_models.py index bcbdcbd413..b91fc2d9a1 100644 --- a/tests/unit/aiplatform/test_models.py +++ b/tests/unit/aiplatform/test_models.py @@ -78,7 +78,12 @@ from google.cloud.aiplatform_v1 import Execution as GapicExecution from google.cloud.aiplatform.model_evaluation import model_evaluation_job -from google.protobuf import field_mask_pb2, struct_pb2, timestamp_pb2 +from google.protobuf import ( + field_mask_pb2, + struct_pb2, + timestamp_pb2, + duration_pb2, +) import constants as test_constants @@ -108,6 +113,14 @@ "loss_fn": "mse", } _TEST_SERVING_CONTAINER_PORTS = [8888, 10000] +_TEST_SERVING_CONTAINER_DEPLOYMENT_TIMEOUT = 100 +_TEST_SERVING_CONTAINER_SHARED_MEMORY_SIZE_MB = 1000 +_TEST_SERVING_CONTAINER_STARTUP_PROBE_EXEC = ["a", "b"] +_TEST_SERVING_CONTAINER_STARTUP_PROBE_PERIOD_SECONDS = 5 +_TEST_SERVING_CONTAINER_STARTUP_PROBE_TIMEOUT_SECONDS = 100 +_TEST_SERVING_CONTAINER_HEALTH_PROBE_EXEC = ["c", "d"] +_TEST_SERVING_CONTAINER_HEALTH_PROBE_PERIOD_SECONDS = 20 +_TEST_SERVING_CONTAINER_HEALTH_PROBE_TIMEOUT_SECONDS = 200 _TEST_ID = "1028944691210842416" _TEST_LABEL = test_constants.ProjectConstants._TEST_LABELS _TEST_APPENDED_USER_AGENT = ["fake_user_agent", "another_fake_user_agent"] @@ -1598,6 +1611,14 @@ def test_upload_uploads_and_gets_model_with_all_args( labels=_TEST_LABEL, sync=sync, upload_request_timeout=None, + serving_container_deployment_timeout=_TEST_SERVING_CONTAINER_DEPLOYMENT_TIMEOUT, + serving_container_shared_memory_size_mb=_TEST_SERVING_CONTAINER_SHARED_MEMORY_SIZE_MB, + serving_container_startup_probe_exec=_TEST_SERVING_CONTAINER_STARTUP_PROBE_EXEC, + serving_container_startup_probe_period_seconds=_TEST_SERVING_CONTAINER_STARTUP_PROBE_PERIOD_SECONDS, + serving_container_startup_probe_timeout_seconds=_TEST_SERVING_CONTAINER_STARTUP_PROBE_TIMEOUT_SECONDS, + serving_container_health_probe_exec=_TEST_SERVING_CONTAINER_HEALTH_PROBE_EXEC, + serving_container_health_probe_period_seconds=_TEST_SERVING_CONTAINER_HEALTH_PROBE_PERIOD_SECONDS, + serving_container_health_probe_timeout_seconds=_TEST_SERVING_CONTAINER_HEALTH_PROBE_TIMEOUT_SECONDS, ) if not sync: @@ -1613,6 +1634,26 @@ def test_upload_uploads_and_gets_model_with_all_args( for port in _TEST_SERVING_CONTAINER_PORTS ] + deployment_timeout = duration_pb2.Duration( + seconds=_TEST_SERVING_CONTAINER_DEPLOYMENT_TIMEOUT + ) + + startup_probe = gca_model.Probe( + exec=gca_model.Probe.ExecAction( + command=_TEST_SERVING_CONTAINER_STARTUP_PROBE_EXEC + ), + period_seconds=_TEST_SERVING_CONTAINER_STARTUP_PROBE_PERIOD_SECONDS, + timeout_seconds=_TEST_SERVING_CONTAINER_STARTUP_PROBE_TIMEOUT_SECONDS, + ) + + health_probe = gca_model.Probe( + exec=gca_model.Probe.ExecAction( + command=_TEST_SERVING_CONTAINER_HEALTH_PROBE_EXEC + ), + period_seconds=_TEST_SERVING_CONTAINER_HEALTH_PROBE_PERIOD_SECONDS, + timeout_seconds=_TEST_SERVING_CONTAINER_HEALTH_PROBE_TIMEOUT_SECONDS, + ) + container_spec = gca_model.ModelContainerSpec( image_uri=_TEST_SERVING_CONTAINER_IMAGE, predict_route=_TEST_SERVING_CONTAINER_PREDICTION_ROUTE, @@ -1621,6 +1662,10 @@ def test_upload_uploads_and_gets_model_with_all_args( args=_TEST_SERVING_CONTAINER_ARGS, env=env, ports=ports, + deployment_timeout=deployment_timeout, + shared_memory_size_mb=_TEST_SERVING_CONTAINER_SHARED_MEMORY_SIZE_MB, + startup_probe=startup_probe, + health_probe=health_probe, ) managed_model = gca_model.Model(