Skip to content

Commit

Permalink
feat: Allow customizing pipeline caching options for model evaluation…
Browse files Browse the repository at this point in the history
… jobs.

PiperOrigin-RevId: 673540795
  • Loading branch information
jsondai authored and copybara-github committed Sep 11, 2024
1 parent ef80003 commit 73490b2
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ def _create_and_submit_pipeline_job(
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
experiment: Optional[Union[str, "aiplatform.Experiment"]] = None,
enable_caching: Optional[bool] = None,
) -> "_VertexAiPipelineBasedService":
"""Create a new PipelineJob using the provided template and parameters.
Expand Down Expand Up @@ -310,6 +311,16 @@ def _create_and_submit_pipeline_job(
experiment (Union[str, experiments_resource.Experiment]):
Optional. The Vertex AI experiment name or instance to associate
to the PipelineJob executing this model evaluation job.
enable_caching (bool):
Optional. Whether to turn on caching for the run.
If this is not set, defaults to the compile time settings, which
are True for all tasks by default, while users may specify
different caching options for individual tasks.
If this is set, the setting applies to all tasks in the pipeline.
Overrides the compile time settings.
Returns:
(VertexAiPipelineBasedService):
Instantiated representation of a Vertex AI Pipeline based service.
Expand All @@ -334,6 +345,7 @@ def _create_and_submit_pipeline_job(
project=project,
location=location,
credentials=credentials,
enable_caching=enable_caching,
)

# Suppresses logs from PipelineJob
Expand Down
12 changes: 12 additions & 0 deletions google/cloud/aiplatform/model_evaluation/model_evaluation_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def submit(
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
experiment: Optional[Union[str, "aiplatform.Experiment"]] = None,
enable_caching: Optional[bool] = None,
) -> "_ModelEvaluationJob":
"""Submits a Model Evaluation Job using aiplatform.PipelineJob and returns
the ModelEvaluationJob resource.
Expand Down Expand Up @@ -277,6 +278,16 @@ def submit(
experiment (Union[str, experiments_resource.Experiment]):
Optional. The Vertex AI experiment name or instance to associate to the PipelineJob executing
this model evaluation job.
enable_caching (bool):
Optional. Whether to turn on caching for the run.
If this is not set, defaults to the compile time settings, which
are True for all tasks by default, while users may specify
different caching options for individual tasks.
If this is set, the setting applies to all tasks in the pipeline.
Overrides the compile time settings.
Returns:
(ModelEvaluationJob): Instantiated represnetation of the model evaluation job.
"""
Expand Down Expand Up @@ -351,6 +362,7 @@ def submit(
location=location,
credentials=credentials,
experiment=experiment,
enable_caching=enable_caching,
)

_LOGGER.info(
Expand Down
12 changes: 12 additions & 0 deletions google/cloud/aiplatform/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6883,6 +6883,7 @@ def evaluate(
network: Optional[str] = None,
encryption_spec_key_name: Optional[str] = None,
experiment: Optional[Union[str, "aiplatform.Experiment"]] = None,
enable_caching: Optional[bool] = None,
) -> "model_evaluation._ModelEvaluationJob":
"""Creates a model evaluation job running on Vertex Pipelines and returns the resulting
ModelEvaluationJob resource.
Expand Down Expand Up @@ -6968,6 +6969,16 @@ def evaluate(
this model evaluation job. Metrics produced by the PipelineJob as system.Metric Artifacts
will be associated as metrics to the provided experiment, and parameters from this PipelineJob
will be associated as parameters to the provided experiment.
enable_caching (bool):
Optional. Whether to turn on caching for the run.
If this is not set, defaults to the compile time settings, which
are True for all tasks by default, while users may specify
different caching options for individual tasks.
If this is set, the setting applies to all tasks in the pipeline.
Overrides the compile time settings.
Returns:
model_evaluation.ModelEvaluationJob: Instantiated representation of the
_ModelEvaluationJob.
Expand Down Expand Up @@ -7088,6 +7099,7 @@ def evaluate(
encryption_spec_key_name=encryption_spec_key_name,
credentials=self.credentials,
experiment=experiment,
enable_caching=enable_caching,
)


Expand Down
127 changes: 127 additions & 0 deletions tests/unit/aiplatform/test_model_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,39 @@
}
)

_TEST_MODEL_EVAL_PIPELINE_SPEC_WITH_CACHING_OPTIONS_JSON = json.dumps(
{
"pipelineInfo": {"name": "evaluation-default-pipeline"},
"root": {
"dag": {
"tasks": {
"model-evaluation-text-generation": {
"taskInfo": {"name": "model-evaluation-text-generation"},
"cachingOptions": {"enableCache": False},
}
}
},
"inputDefinitions": {
"parameters": {
"batch_predict_gcs_source_uris": {"type": "STRING"},
"dataflow_service_account": {"type": "STRING"},
"batch_predict_instances_format": {"type": "STRING"},
"batch_predict_machine_type": {"type": "STRING"},
"evaluation_class_labels": {"type": "STRING"},
"location": {"type": "STRING"},
"model_name": {"type": "STRING"},
"project": {"type": "STRING"},
"batch_predict_gcs_destination_output_uri": {"type": "STRING"},
"target_field_name": {"type": "STRING"},
}
},
},
"schemaVersion": "2.0.0",
"sdkVersion": "kfp-1.8.12",
"components": {},
}
)

_TEST_INVALID_MODEL_EVAL_PIPELINE_SPEC = json.dumps(
{
"pipelineInfo": {"name": "my-pipeline"},
Expand Down Expand Up @@ -1083,6 +1116,100 @@ def test_model_evaluation_job_submit(

assert mock_model_eval_job_get.called_once

@pytest.mark.parametrize(
"job_spec",
[_TEST_MODEL_EVAL_PIPELINE_SPEC_WITH_CACHING_OPTIONS_JSON],
)
@pytest.mark.usefixtures("mock_pipeline_service_create")
def test_model_evaluation_job_submit_with_caching_disabled(
self,
job_spec,
mock_load_yaml_and_json,
mock_model,
get_model_mock,
mock_model_eval_get,
mock_model_eval_job_get,
mock_pipeline_service_get,
mock_model_eval_job_create,
mock_pipeline_bucket_exists,
mock_request_urlopen,
):
test_model_eval_job = model_evaluation_job._ModelEvaluationJob.submit(
model_name=_TEST_MODEL_RESOURCE_NAME,
prediction_type=_TEST_MODEL_EVAL_PREDICTION_TYPE,
instances_format=_TEST_MODEL_EVAL_PIPELINE_PARAMETER_VALUES[
"batch_predict_instances_format"
],
model_type="automl_tabular",
pipeline_root=_TEST_GCS_BUCKET_NAME,
target_field_name=_TEST_MODEL_EVAL_PIPELINE_PARAMETER_VALUES[
"target_field_name"
],
evaluation_pipeline_display_name=_TEST_MODEL_EVAL_PIPELINE_JOB_DISPLAY_NAME,
gcs_source_uris=_TEST_MODEL_EVAL_PIPELINE_PARAMETER_VALUES[
"batch_predict_gcs_source_uris"
],
job_id=_TEST_PIPELINE_JOB_ID,
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
enable_caching=False,
)

test_model_eval_job.wait()

expected_runtime_config_dict = {
"gcsOutputDirectory": _TEST_GCS_BUCKET_NAME,
"parameters": {
"batch_predict_gcs_source_uris": {
"stringValue": '["gs://my-bucket/my-prediction-data.csv"]'
},
"dataflow_service_account": {"stringValue": _TEST_SERVICE_ACCOUNT},
"batch_predict_instances_format": {"stringValue": "csv"},
"model_name": {"stringValue": _TEST_MODEL_RESOURCE_NAME},
"project": {"stringValue": _TEST_PROJECT},
"location": {"stringValue": _TEST_LOCATION},
"batch_predict_gcs_destination_output_uri": {
"stringValue": _TEST_GCS_BUCKET_NAME
},
"target_field_name": {"stringValue": "predict_class"},
},
}

runtime_config = gca_pipeline_job.PipelineJob.RuntimeConfig()._pb
json_format.ParseDict(expected_runtime_config_dict, runtime_config)

job_spec = yaml.safe_load(job_spec)
pipeline_spec = job_spec.get("pipelineSpec") or job_spec

# Construct expected request
expected_gapic_pipeline_job = gca_pipeline_job.PipelineJob(
display_name=_TEST_MODEL_EVAL_PIPELINE_JOB_DISPLAY_NAME,
pipeline_spec={
"components": {},
"pipelineInfo": pipeline_spec["pipelineInfo"],
"root": pipeline_spec["root"],
"schemaVersion": "2.0.0",
"sdkVersion": "kfp-1.8.12",
},
runtime_config=runtime_config,
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
template_uri=_TEST_KFP_TEMPLATE_URI,
)

mock_model_eval_job_create.assert_called_with(
parent=_TEST_PARENT,
pipeline_job=expected_gapic_pipeline_job,
pipeline_job_id=_TEST_PIPELINE_JOB_ID,
timeout=None,
)

assert mock_model_eval_job_get.called_once

assert mock_pipeline_service_get.called_once

assert mock_model_eval_job_get.called_once

@pytest.mark.parametrize(
"job_spec",
[_TEST_MODEL_EVAL_PIPELINE_SPEC_JSON],
Expand Down

0 comments on commit 73490b2

Please sign in to comment.