Skip to content

Commit

Permalink
fix: Always upload the pickled object and dependencies tarball when c…
Browse files Browse the repository at this point in the history
…reating ReasoningEngine

PiperOrigin-RevId: 665166642
  • Loading branch information
yeesian authored and copybara-github committed Aug 20, 2024
1 parent 45e4251 commit 34ef5a3
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 23 deletions.
58 changes: 58 additions & 0 deletions tests/unit/vertex_langchain/test_reasoning_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,64 @@ def setup_method(self):
def teardown_method(self):
initializer.global_pool.shutdown(wait=True)

def test_prepare_create(
self,
cloud_storage_create_bucket_mock,
tarfile_open_mock,
cloudpickle_dump_mock,
):
_reasoning_engines._prepare_create(
reasoning_engine=self.test_app,
requirements=_TEST_REASONING_ENGINE_REQUIREMENTS,
extra_packages=[],
project=_TEST_PROJECT,
location=_TEST_LOCATION,
staging_bucket=_TEST_STAGING_BUCKET,
gcs_dir_name=_TEST_GCS_DIR_NAME,
)
cloudpickle_dump_mock.assert_called() # when preparing object.pkl
tarfile_open_mock.assert_called() # when preparing extra_packages

def test_prepare_update_with_unspecified_extra_packages(
self,
cloud_storage_create_bucket_mock,
cloudpickle_dump_mock,
):
with mock.patch.object(
_reasoning_engines,
"_upload_extra_packages",
) as upload_extra_packages_mock:
_reasoning_engines._prepare_update(
reasoning_engine=self.test_app,
requirements=_TEST_REASONING_ENGINE_REQUIREMENTS,
extra_packages=None,
project=_TEST_PROJECT,
location=_TEST_LOCATION,
staging_bucket=_TEST_STAGING_BUCKET,
gcs_dir_name=_TEST_GCS_DIR_NAME,
)
upload_extra_packages_mock.assert_not_called()

def test_prepare_update_with_empty_extra_packages(
self,
cloud_storage_create_bucket_mock,
cloudpickle_dump_mock,
):
with mock.patch.object(
_reasoning_engines,
"_upload_extra_packages",
) as upload_extra_packages_mock:
_reasoning_engines._prepare_update(
reasoning_engine=self.test_app,
requirements=_TEST_REASONING_ENGINE_REQUIREMENTS,
extra_packages=[],
project=_TEST_PROJECT,
location=_TEST_LOCATION,
staging_bucket=_TEST_STAGING_BUCKET,
gcs_dir_name=_TEST_GCS_DIR_NAME,
)
upload_extra_packages_mock.assert_called() # user wants to override

def test_get_reasoning_engine(self, get_reasoning_engine_mock):
reasoning_engines.ReasoningEngine(_TEST_RESOURCE_ID)
get_reasoning_engine_mock.assert_called_with(
Expand Down
87 changes: 64 additions & 23 deletions vertexai/reasoning_engines/_reasoning_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def create(
# This involves packaging and uploading the artifacts for
# reasoning_engine, requirements and extra_packages to
# `staging_bucket/gcs_dir_name`.
_prepare(
_prepare_create(
reasoning_engine=reasoning_engine,
requirements=requirements,
project=sdk_resource.project,
Expand Down Expand Up @@ -304,11 +304,15 @@ def update(
Args:
reasoning_engine (ReasoningEngineInterface):
Optional. The Reasoning Engine to be replaced.
Optional. The Reasoning Engine to be replaced. If it is not
specified, the existing Reasoning Engine will be used.
requirements (Union[str, Sequence[str]]):
Optional. The set of PyPI dependencies needed. It can either be
the path to a single file (requirements.txt), or an ordered list
of strings corresponding to each line of the requirements file.
If it is not specified, the existing requirements will be used.
If it is set to an empty string or list, the existing
requirements will be removed.
display_name (str):
Optional. The user-defined name of the Reasoning Engine.
The name can be up to 128 characters long and can comprise any
Expand All @@ -322,7 +326,10 @@ def update(
Optional. The Python system version used. Currently updating
sys version is not supported.
extra_packages (Sequence[str]):
Optional. The set of extra user-provided packages (if any).
Optional. The set of extra user-provided packages (if any). If
it is not specified, the existing extra packages will be used.
If it is set to an empty list, the existing extra packages will
be removed.
Returns:
ReasoningEngine: The Reasoning Engine that was updated.
Expand Down Expand Up @@ -359,18 +366,18 @@ def update(
)
if sys_version:
_LOGGER.warning("Updated sys_version is not supported.")
if requirements:
if requirements is not None:
requirements = _validate_requirements_or_raise(requirements)
if extra_packages:
if extra_packages is not None:
extra_packages = _validate_extra_packages_or_raise(extra_packages)
if reasoning_engine:
if reasoning_engine is not None:
reasoning_engine = _validate_reasoning_engine_or_raise(reasoning_engine)

# Prepares the Reasoning Engine for creation in Vertex AI.
# Prepares the Reasoning Engine for update in Vertex AI.
# This involves packaging and uploading the artifacts for
# reasoning_engine, requirements and extra_packages to
# `staging_bucket/gcs_dir_name`.
_prepare(
_prepare_update(
reasoning_engine=reasoning_engine,
requirements=requirements,
project=self.project,
Expand All @@ -392,14 +399,11 @@ def update(
operation_future = self.api_client.update_reasoning_engine(
request=update_request
)
_LOGGER.log_create_with_lro(ReasoningEngine, operation_future)
created_resource = operation_future.result()
_LOGGER.log_create_complete(
ReasoningEngine,
created_resource,
self._resource_noun,
module_name="vertexai.preview.reasoning_engines",
_LOGGER.info(
f"Update ReasoningEngine backing LRO: {operation_future.operation.name}"
)
created_resource = operation_future.result()
_LOGGER.info(f"ReasoningEngine updated. Resource name: {created_resource.name}")
self._operation_schemas = None
return self

Expand Down Expand Up @@ -560,7 +564,7 @@ def _upload_extra_packages(
_LOGGER.info(f"Writing to {dir_name}/{_EXTRA_PACKAGES_FILE}")


def _prepare(
def _prepare_create(
reasoning_engine: Queryable,
requirements: Sequence[str],
extra_packages: Sequence[str],
Expand All @@ -571,7 +575,10 @@ def _prepare(
) -> None:
"""Prepares the reasoning engine for creation in Vertex AI.
This involves packaging and uploading the artifacts to Cloud Storage.
This involves packaging and uploading artifacts to Cloud Storage. Note that
1. This does not actually create the Reasoning Engine in Vertex AI.
2. This will always generate and upload a pickled object.
3. This will always generate and upload the dependencies.tar.gz file.
Args:
reasoning_engine: The reasoning engine to be prepared.
Expand All @@ -584,11 +591,45 @@ def _prepare(
use for staging the artifacts needed.
"""
gcs_bucket = _get_gcs_bucket(project, location, staging_bucket)
if reasoning_engine:
_upload_reasoning_engine(reasoning_engine, gcs_bucket, gcs_dir_name)
_upload_reasoning_engine(reasoning_engine, gcs_bucket, gcs_dir_name)
if requirements:
_upload_requirements(requirements, gcs_bucket, gcs_dir_name)
if extra_packages:
_upload_extra_packages(extra_packages, gcs_bucket, gcs_dir_name)


def _prepare_update(
reasoning_engine: Optional[Queryable],
requirements: Optional[Sequence[str]],
extra_packages: Optional[Sequence[str]],
project: str,
location: str,
staging_bucket: str,
gcs_dir_name: str,
) -> None:
"""Prepares the reasoning engine for updates in Vertex AI.
This involves packaging and uploading artifacts to Cloud Storage. Note that
1. This does not actually update the Reasoning Engine in Vertex AI.
2. This will only generate and upload a pickled object if specified.
3. This will only generate and upload the dependencies.tar.gz file if
extra_packages is non-empty.
Args:
reasoning_engine: The reasoning engine to be prepared.
requirements (Sequence[str]): The set of PyPI dependencies needed.
extra_packages (Sequence[str]): The set of extra user-provided packages.
project (str): The project for the staging bucket.
location (str): The location for the staging bucket.
staging_bucket (str): The staging bucket name in the form "gs://...".
gcs_dir_name (str): The GCS bucket directory under `staging_bucket` to
use for staging the artifacts needed.
"""
gcs_bucket = _get_gcs_bucket(project, location, staging_bucket)
if reasoning_engine is not None:
_upload_reasoning_engine(reasoning_engine, gcs_bucket, gcs_dir_name)
if requirements is not None:
_upload_requirements(requirements, gcs_bucket, gcs_dir_name)
if extra_packages is not None:
_upload_extra_packages(extra_packages, gcs_bucket, gcs_dir_name)


Expand All @@ -607,23 +648,23 @@ def _generate_update_request_or_raise(
update_masks: List[str] = []
reasoning_engine_spec = types.ReasoningEngineSpec()
package_spec = types.ReasoningEngineSpec.PackageSpec()
if requirements:
if requirements is not None:
is_spec_update = True
update_masks.append("spec.package_spec.requirements_gcs_uri")
package_spec.requirements_gcs_uri = "{}/{}/{}".format(
staging_bucket,
gcs_dir_name,
_REQUIREMENTS_FILE,
)
if extra_packages:
if extra_packages is not None:
is_spec_update = True
update_masks.append("spec.package_spec.dependency_files_gcs_uri")
package_spec.dependency_files_gcs_uri = "{}/{}/{}".format(
staging_bucket,
gcs_dir_name,
_EXTRA_PACKAGES_FILE,
)
if reasoning_engine:
if reasoning_engine is not None:
is_spec_update = True
update_masks.append("spec.package_spec.pickle_object_gcs_uri")
package_spec.pickle_object_gcs_uri = "{}/{}/{}".format(
Expand Down

0 comments on commit 34ef5a3

Please sign in to comment.