From 73490b22a239cb1a3c31349f8db6cfbc5232e231 Mon Sep 17 00:00:00 2001 From: Jason Dai Date: Wed, 11 Sep 2024 14:35:57 -0700 Subject: [PATCH] feat: Allow customizing pipeline caching options for model evaluation jobs. PiperOrigin-RevId: 673540795 --- .../pipeline_based_service.py | 12 ++ .../model_evaluation/model_evaluation_job.py | 12 ++ google/cloud/aiplatform/models.py | 12 ++ .../unit/aiplatform/test_model_evaluation.py | 127 ++++++++++++++++++ 4 files changed, 163 insertions(+) diff --git a/google/cloud/aiplatform/_pipeline_based_service/pipeline_based_service.py b/google/cloud/aiplatform/_pipeline_based_service/pipeline_based_service.py index 2ef1db5ad5..fe393926ab 100644 --- a/google/cloud/aiplatform/_pipeline_based_service/pipeline_based_service.py +++ b/google/cloud/aiplatform/_pipeline_based_service/pipeline_based_service.py @@ -269,6 +269,7 @@ def _create_and_submit_pipeline_job( location: Optional[str] = None, credentials: Optional[auth_credentials.Credentials] = None, experiment: Optional[Union[str, "aiplatform.Experiment"]] = None, + enable_caching: Optional[bool] = None, ) -> "_VertexAiPipelineBasedService": """Create a new PipelineJob using the provided template and parameters. @@ -310,6 +311,16 @@ def _create_and_submit_pipeline_job( experiment (Union[str, experiments_resource.Experiment]): Optional. The Vertex AI experiment name or instance to associate to the PipelineJob executing this model evaluation job. + enable_caching (bool): + Optional. Whether to turn on caching for the run. + + If this is not set, defaults to the compile time settings, which + are True for all tasks by default, while users may specify + different caching options for individual tasks. + + If this is set, the setting applies to all tasks in the pipeline. + + Overrides the compile time settings. Returns: (VertexAiPipelineBasedService): Instantiated representation of a Vertex AI Pipeline based service. @@ -334,6 +345,7 @@ def _create_and_submit_pipeline_job( project=project, location=location, credentials=credentials, + enable_caching=enable_caching, ) # Suppresses logs from PipelineJob diff --git a/google/cloud/aiplatform/model_evaluation/model_evaluation_job.py b/google/cloud/aiplatform/model_evaluation/model_evaluation_job.py index bde7e6da39..87f4e44b1f 100644 --- a/google/cloud/aiplatform/model_evaluation/model_evaluation_job.py +++ b/google/cloud/aiplatform/model_evaluation/model_evaluation_job.py @@ -174,6 +174,7 @@ def submit( location: Optional[str] = None, credentials: Optional[auth_credentials.Credentials] = None, experiment: Optional[Union[str, "aiplatform.Experiment"]] = None, + enable_caching: Optional[bool] = None, ) -> "_ModelEvaluationJob": """Submits a Model Evaluation Job using aiplatform.PipelineJob and returns the ModelEvaluationJob resource. @@ -277,6 +278,16 @@ def submit( experiment (Union[str, experiments_resource.Experiment]): Optional. The Vertex AI experiment name or instance to associate to the PipelineJob executing this model evaluation job. + enable_caching (bool): + Optional. Whether to turn on caching for the run. + + If this is not set, defaults to the compile time settings, which + are True for all tasks by default, while users may specify + different caching options for individual tasks. + + If this is set, the setting applies to all tasks in the pipeline. + + Overrides the compile time settings. Returns: (ModelEvaluationJob): Instantiated represnetation of the model evaluation job. """ @@ -351,6 +362,7 @@ def submit( location=location, credentials=credentials, experiment=experiment, + enable_caching=enable_caching, ) _LOGGER.info( diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index dc8e105ef6..6abb482061 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -6883,6 +6883,7 @@ def evaluate( network: Optional[str] = None, encryption_spec_key_name: Optional[str] = None, experiment: Optional[Union[str, "aiplatform.Experiment"]] = None, + enable_caching: Optional[bool] = None, ) -> "model_evaluation._ModelEvaluationJob": """Creates a model evaluation job running on Vertex Pipelines and returns the resulting ModelEvaluationJob resource. @@ -6968,6 +6969,16 @@ def evaluate( this model evaluation job. Metrics produced by the PipelineJob as system.Metric Artifacts will be associated as metrics to the provided experiment, and parameters from this PipelineJob will be associated as parameters to the provided experiment. + enable_caching (bool): + Optional. Whether to turn on caching for the run. + + If this is not set, defaults to the compile time settings, which + are True for all tasks by default, while users may specify + different caching options for individual tasks. + + If this is set, the setting applies to all tasks in the pipeline. + + Overrides the compile time settings. Returns: model_evaluation.ModelEvaluationJob: Instantiated representation of the _ModelEvaluationJob. @@ -7088,6 +7099,7 @@ def evaluate( encryption_spec_key_name=encryption_spec_key_name, credentials=self.credentials, experiment=experiment, + enable_caching=enable_caching, ) diff --git a/tests/unit/aiplatform/test_model_evaluation.py b/tests/unit/aiplatform/test_model_evaluation.py index dcbb8aed93..0117b83458 100644 --- a/tests/unit/aiplatform/test_model_evaluation.py +++ b/tests/unit/aiplatform/test_model_evaluation.py @@ -198,6 +198,39 @@ } ) +_TEST_MODEL_EVAL_PIPELINE_SPEC_WITH_CACHING_OPTIONS_JSON = json.dumps( + { + "pipelineInfo": {"name": "evaluation-default-pipeline"}, + "root": { + "dag": { + "tasks": { + "model-evaluation-text-generation": { + "taskInfo": {"name": "model-evaluation-text-generation"}, + "cachingOptions": {"enableCache": False}, + } + } + }, + "inputDefinitions": { + "parameters": { + "batch_predict_gcs_source_uris": {"type": "STRING"}, + "dataflow_service_account": {"type": "STRING"}, + "batch_predict_instances_format": {"type": "STRING"}, + "batch_predict_machine_type": {"type": "STRING"}, + "evaluation_class_labels": {"type": "STRING"}, + "location": {"type": "STRING"}, + "model_name": {"type": "STRING"}, + "project": {"type": "STRING"}, + "batch_predict_gcs_destination_output_uri": {"type": "STRING"}, + "target_field_name": {"type": "STRING"}, + } + }, + }, + "schemaVersion": "2.0.0", + "sdkVersion": "kfp-1.8.12", + "components": {}, + } +) + _TEST_INVALID_MODEL_EVAL_PIPELINE_SPEC = json.dumps( { "pipelineInfo": {"name": "my-pipeline"}, @@ -1083,6 +1116,100 @@ def test_model_evaluation_job_submit( assert mock_model_eval_job_get.called_once + @pytest.mark.parametrize( + "job_spec", + [_TEST_MODEL_EVAL_PIPELINE_SPEC_WITH_CACHING_OPTIONS_JSON], + ) + @pytest.mark.usefixtures("mock_pipeline_service_create") + def test_model_evaluation_job_submit_with_caching_disabled( + self, + job_spec, + mock_load_yaml_and_json, + mock_model, + get_model_mock, + mock_model_eval_get, + mock_model_eval_job_get, + mock_pipeline_service_get, + mock_model_eval_job_create, + mock_pipeline_bucket_exists, + mock_request_urlopen, + ): + test_model_eval_job = model_evaluation_job._ModelEvaluationJob.submit( + model_name=_TEST_MODEL_RESOURCE_NAME, + prediction_type=_TEST_MODEL_EVAL_PREDICTION_TYPE, + instances_format=_TEST_MODEL_EVAL_PIPELINE_PARAMETER_VALUES[ + "batch_predict_instances_format" + ], + model_type="automl_tabular", + pipeline_root=_TEST_GCS_BUCKET_NAME, + target_field_name=_TEST_MODEL_EVAL_PIPELINE_PARAMETER_VALUES[ + "target_field_name" + ], + evaluation_pipeline_display_name=_TEST_MODEL_EVAL_PIPELINE_JOB_DISPLAY_NAME, + gcs_source_uris=_TEST_MODEL_EVAL_PIPELINE_PARAMETER_VALUES[ + "batch_predict_gcs_source_uris" + ], + job_id=_TEST_PIPELINE_JOB_ID, + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + enable_caching=False, + ) + + test_model_eval_job.wait() + + expected_runtime_config_dict = { + "gcsOutputDirectory": _TEST_GCS_BUCKET_NAME, + "parameters": { + "batch_predict_gcs_source_uris": { + "stringValue": '["gs://my-bucket/my-prediction-data.csv"]' + }, + "dataflow_service_account": {"stringValue": _TEST_SERVICE_ACCOUNT}, + "batch_predict_instances_format": {"stringValue": "csv"}, + "model_name": {"stringValue": _TEST_MODEL_RESOURCE_NAME}, + "project": {"stringValue": _TEST_PROJECT}, + "location": {"stringValue": _TEST_LOCATION}, + "batch_predict_gcs_destination_output_uri": { + "stringValue": _TEST_GCS_BUCKET_NAME + }, + "target_field_name": {"stringValue": "predict_class"}, + }, + } + + runtime_config = gca_pipeline_job.PipelineJob.RuntimeConfig()._pb + json_format.ParseDict(expected_runtime_config_dict, runtime_config) + + job_spec = yaml.safe_load(job_spec) + pipeline_spec = job_spec.get("pipelineSpec") or job_spec + + # Construct expected request + expected_gapic_pipeline_job = gca_pipeline_job.PipelineJob( + display_name=_TEST_MODEL_EVAL_PIPELINE_JOB_DISPLAY_NAME, + pipeline_spec={ + "components": {}, + "pipelineInfo": pipeline_spec["pipelineInfo"], + "root": pipeline_spec["root"], + "schemaVersion": "2.0.0", + "sdkVersion": "kfp-1.8.12", + }, + runtime_config=runtime_config, + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + template_uri=_TEST_KFP_TEMPLATE_URI, + ) + + mock_model_eval_job_create.assert_called_with( + parent=_TEST_PARENT, + pipeline_job=expected_gapic_pipeline_job, + pipeline_job_id=_TEST_PIPELINE_JOB_ID, + timeout=None, + ) + + assert mock_model_eval_job_get.called_once + + assert mock_pipeline_service_get.called_once + + assert mock_model_eval_job_get.called_once + @pytest.mark.parametrize( "job_spec", [_TEST_MODEL_EVAL_PIPELINE_SPEC_JSON],