From c22622b57568f9b71f41f4f98111d6ef61d04ef1 Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Tue, 13 Dec 2022 16:21:05 -0800 Subject: [PATCH 1/9] Add s3 block --- prefect_aws/client_parameters.py | 17 ++- prefect_aws/credentials.py | 23 ++++ prefect_aws/s3.py | 228 ++++++++++++++++++++++++++++++- 3 files changed, 261 insertions(+), 7 deletions(-) diff --git a/prefect_aws/client_parameters.py b/prefect_aws/client_parameters.py index bf6d58fb..61229d89 100644 --- a/prefect_aws/client_parameters.py +++ b/prefect_aws/client_parameters.py @@ -1,14 +1,12 @@ """Module handling Client parameters""" -import dataclasses -from dataclasses import dataclass from typing import Any, Dict, Optional, Union from botocore.client import Config +from prefect.blocks.core import Block -@dataclass(frozen=True) -class AwsClientParameters: +class AwsClientParameters(Block): """ Dataclass used to manage extra parameters that you can pass when you initialize the Client. If you want to find more information, see @@ -50,10 +48,17 @@ class AwsClientParameters: use_ssl: Optional[bool] = None verify: Optional[Union[bool, str]] = None endpoint_url: Optional[str] = None - config: Optional[Config] = None + config: Optional[Dict[str, Any]] = None def get_params_override(self) -> Dict[str, Any]: """ Return the dictionary of the parameters to override. The parameters to override are the one which are not None. """ # noqa E501 - return {k: v for k, v in dataclasses.asdict(self).items() if v is not None} + params_override = {} + for key, value in self.dict().items(): + if value is None: + continue + if key == "config": + value = Config(**value) + params_override[key] = value + return params_override diff --git a/prefect_aws/credentials.py b/prefect_aws/credentials.py index 882e95fb..8521c505 100644 --- a/prefect_aws/credentials.py +++ b/prefect_aws/credentials.py @@ -3,9 +3,12 @@ from typing import Optional import boto3 +from mypy_boto3_s3 import S3Client from prefect.blocks.core import Block from pydantic import Field, SecretStr +from prefect_aws.client_parameters import AwsClientParameters + class AwsCredentials(Block): """ @@ -46,6 +49,10 @@ class AwsCredentials(Block): default=None, description="The AWS Region where you want to create new connections.", ) + aws_client_parameters: AwsClientParameters = Field( + default_factory=AwsClientParameters, + description="Extra parameters to initialize the Client.", + ) def get_boto3_session(self) -> boto3.Session: """ @@ -76,6 +83,12 @@ def get_boto3_session(self) -> boto3.Session: region_name=self.region_name, ) + def get_s3_client(self) -> S3Client: + client = self.get_boto3_session().client( + service_name="s3", **self.aws_client_parameters.get_params_override() + ) + return client + class MinIOCredentials(Block): """ @@ -108,6 +121,10 @@ class MinIOCredentials(Block): minio_root_user: str minio_root_password: SecretStr region_name: Optional[str] = None + aws_client_parameters: AwsClientParameters = Field( + default_factory=AwsClientParameters, + description="Extra parameters to initialize the Client.", + ) def get_boto3_session(self) -> boto3.Session: """ @@ -140,3 +157,9 @@ def get_boto3_session(self) -> boto3.Session: aws_secret_access_key=minio_root_password, region_name=self.region_name, ) + + def get_s3_client(self) -> boto3.client: + client = self.get_boto3_session().client( + service_name="s3", **self.aws_client_parameters.get_params_override() + ) + return client diff --git a/prefect_aws/s3.py b/prefect_aws/s3.py index f4b1851a..536f408e 100644 --- a/prefect_aws/s3.py +++ b/prefect_aws/s3.py @@ -3,12 +3,13 @@ import os import uuid from pathlib import Path -from typing import Any, Dict, List, Optional, Union +from typing import Any, BinaryIO, Dict, List, Optional, Union from uuid import uuid4 import boto3 from botocore.paginate import PageIterator from prefect import get_run_logger, task +from prefect.blocks.abstract import ObjectStorageBlock from prefect.filesystems import WritableDeploymentStorage, WritableFileSystem from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible from prefect.utilities.filesystem import filter_files @@ -576,3 +577,228 @@ def _write_sync(self, key: str, data: bytes) -> None: with io.BytesIO(data) as stream: s3_client.upload_fileobj(Fileobj=stream, Bucket=self.bucket_name, Key=key) + + +class S3(ObjectStorageBlock): + + aws_credentials: Union[MinIOCredentials, AwsCredentials] + bucket_name: str = Field(..., description="The name of the S3 bucket.") + + def _list_objects_sync(self, page_iterator: PageIterator) -> List[Dict[str, Any]]: + """ + Synchronous method to collect S3 objects into a list + + Args: + page_iterator: AWS Paginator for S3 objects + + Returns: + List[Dict]: List of object information + """ + return [ + content for page in page_iterator for content in page.get("Contents", []) + ] + + @sync_compatible + async def list_blobs( + self, + folder: str, + delimiter: str = "", + page_size: Optional[int] = None, + max_items: Optional[int] = None, + jmespath_query: Optional[str] = None, + ) -> List[Dict[str, Any]]: + """ + Args: + bucket: Name of bucket to list items from. Required if a default value + was not supplied when creating the task. + aws_credentials: Credentials to use for authentication with AWS. + aws_client_parameters: Custom parameter for the boto3 client initialization. + prefix: Used to filter objects with keys starting with the specified prefix. + delimiter: Character used to group keys of listed objects. + page_size: Number of objects to return in each request to the AWS API. + max_items: Maximum number of objects that to be returned by task. + jmespath_query: Query used to filter objects based on object attributes refer to + the [boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/paginators.html#filtering-results-with-jmespath) + for more information on how to construct queries. + """ # noqa: E501 + client = self.aws_credentials.get_s3_client() + paginator = client.get_paginator("list_objects_v2") + page_iterator = paginator.paginate( + Bucket=self.bucket_name, + Prefix=folder, + Delimiter=delimiter, + PaginationConfig={"PageSize": page_size, "MaxItems": max_items}, + ) + if jmespath_query: + page_iterator = page_iterator.search(f"{jmespath_query} | {{Contents: @}}") + + return await run_sync_in_worker_thread(self._list_objects_sync, page_iterator) + + @sync_compatible + async def download_object_to_path( + self, + from_path: str, + to_path: Union[str, Path], + **download_kwargs: Dict[str, Any], + ) -> Path: + """ + Downloads an object from the S3 bucket to a path. + + Args: + from_path: The path to download from. + to_path: The path to download to. + **download_kwargs: Additional keyword arguments to pass to download. + + Returns: + The path that the object was downloaded to. + """ + client = self.aws_credentials.get_s3_client() + run_sync_in_worker_thread( + client.download_file, + self.bucket_name, + from_path, + to_path, + **download_kwargs, + ) + return Path(to_path) + + @sync_compatible + async def download_object_to_file_object( + self, + from_path: str, + to_file_object: BinaryIO, + **download_kwargs: Dict[str, Any], + ) -> BinaryIO: + """ + Downloads an object from the S3 bucket to a file-like object, + which can be a BytesIO object or a BufferedWriter. + + Args: + from_path: The path to download from. + to_file_object: The file-like object to download to. + **download_kwargs: Additional keyword arguments to pass to download. + + Returns: + The file-like object that the object was downloaded to. + """ + client = self.aws_credentials.get_s3_client() + run_sync_in_worker_thread( + client.download_fileobj, + Bucket=self.bucket_name, + Key=from_path, + Fileobj=to_file_object, + **download_kwargs, + ) + return to_file_object + + @sync_compatible + async def download_folder_to_path( + self, + from_folder: str, + to_folder: Union[str, Path], + **download_kwargs: Dict[str, Any], + ) -> Path: + """ + Downloads a folder (up to a 1000 objects) from the S3 bucket to a path. + + Args: + from_folder: The path to the folder to download from. + to_folder: The path to download the folder to. + **download_kwargs: Additional keyword arguments to pass to download. + + Returns: + The path that the folder was downloaded to. + """ + client = self.aws_credentials.get_s3_client() + objects = client.list_objects_v2(Bucket=self.bucket_name, Prefix=from_folder) + for object in objects["Contents"]: + path = Path(to_folder) / object["Key"] + path.parent.mkdir(parents=True, exist_ok=True) + run_sync_in_worker_thread( + client.download_file, + Bucket=self.bucket_name, + Key=object["Key"], + Filename=path, + **download_kwargs, + ) + return Path(to_folder) + + @sync_compatible + async def upload_from_path( + self, from_path: Union[str, Path], to_path: str, **upload_kwargs: Dict[str, Any] + ) -> str: + """ + Uploads an object from a path to the S3 bucket. + + Args: + from_path: The path to the file to upload from. + to_path: The path to upload the file to. + **upload_kwargs: Additional keyword arguments to pass to upload. + + Returns: + The path that the object was uploaded to. + """ + client = self.aws_credentials.get_s3_client() + run_sync_in_worker_thread( + client.upload_file, + Filename=from_path, + Bucket=self.bucket_name, + Key=to_path, + **upload_kwargs, + ) + return to_path + + @sync_compatible + async def upload_from_file_object( + self, from_file_object: BinaryIO, to_path: str, **upload_kwargs + ) -> str: + """ + Uploads an object to the S3 bucket from a file-like object, + which can be a BytesIO object or a BufferedReader. + + Args: + from_file_object: The file-like object to upload from. + to_path: The path to upload the object to. + **upload_kwargs: Additional keyword arguments to pass to upload. + Returns: + The path that the object was uploaded to. + """ + client = self.aws_credentials.get_s3_client() + run_sync_in_worker_thread( + client.upload_fileobj, + Fileobj=from_file_object, + Bucket=self.bucket_name, + Key=to_path, + **upload_kwargs, + ) + return to_path + + @sync_compatible + async def upload_from_folder( + self, + from_folder: Union[str, Path], + to_folder: str, + **upload_kwargs: Dict[str, Any], + ) -> str: + """ + Uploads a folder to the S3 bucket from a path. + + Args: + from_folder: The path to the folder to upload from. + to_folder: The path to upload the folder to. + **upload_kwargs: Additional keyword arguments to pass to upload. + + Returns: + The path that the folder was uploaded to. + """ + client = self.aws_credentials.get_s3_client() + for path in Path(from_folder).rglob("*"): + if path.is_file(): + run_sync_in_worker_thread( + client.upload_file, + Filename=path, + Bucket=self.bucket_name, + Key=str(Path(to_folder) / path.relative_to(from_folder)), + **upload_kwargs, + ) + return to_folder From 2171d59f13ad7db6d28f7d532c8897cc815dcc4b Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Tue, 13 Dec 2022 20:18:54 -0800 Subject: [PATCH 2/9] Add secretsmanager --- prefect_aws/credentials.py | 10 ++- prefect_aws/secrets_manager.py | 108 ++++++++++++++++++++++++++++++++- 2 files changed, 116 insertions(+), 2 deletions(-) diff --git a/prefect_aws/credentials.py b/prefect_aws/credentials.py index 8521c505..cfe46b58 100644 --- a/prefect_aws/credentials.py +++ b/prefect_aws/credentials.py @@ -4,6 +4,7 @@ import boto3 from mypy_boto3_s3 import S3Client +from mypy_boto3_secretsmanager import SecretsManagerClient from prefect.blocks.core import Block from pydantic import Field, SecretStr @@ -158,8 +159,15 @@ def get_boto3_session(self) -> boto3.Session: region_name=self.region_name, ) - def get_s3_client(self) -> boto3.client: + def get_s3_client(self) -> S3Client: client = self.get_boto3_session().client( service_name="s3", **self.aws_client_parameters.get_params_override() ) return client + + def get_secrets_manager_client(self) -> SecretsManagerClient: + client = self.get_boto3_session().client( + service_name="secretsmanager", + **self.aws_client_parameters.get_params_override() + ) + return client diff --git a/prefect_aws/secrets_manager.py b/prefect_aws/secrets_manager.py index 4209c48b..bfff2d53 100644 --- a/prefect_aws/secrets_manager.py +++ b/prefect_aws/secrets_manager.py @@ -3,9 +3,11 @@ from botocore.exceptions import ClientError from prefect import get_run_logger, task +from prefect.blocks.abstract import SecretBlock from prefect.utilities.asyncutils import run_sync_in_worker_thread +from pydantic import Field -from prefect_aws import AwsCredentials +from prefect_aws import AwsCredentials, MinIOCredentials @task @@ -350,3 +352,107 @@ def example_delete_secret_with_recovery_window(): except ClientError: logger.exception("Unable to delete secret %s", secret_name) raise + + +class SecretsManager(SecretBlock): + """ + Block that represents a resource that can store and retrieve secrets. + """ + + aws_credentials: Union[AwsCredentials, MinIOCredentials] + secret_name: str = Field(default=..., description="The name of the secret.") + + async def read_secret( + self, version_id: str = None, version_stage: str = None + ) -> Union[str, bytes]: + """ + Reads the secret from the secret storage service. + + Args: + version_id: The version of the secret to read. If not provided, the latest + version will be read. + version_stage: The version stage of the secret to read. If not provided, + the latest version will be read. + + Returns: + The secret data. + """ + client = self.aws_credentials.get_secrets_manager_client() + response = await run_sync_in_worker_thread( + client.get_secret_value, + SecretId=self.secret_name, + VersionId=version_id, + version_stage=version_stage, + ) + secret = response.get("SecretString") or response.get("SecretBinary") + return secret + + async def write_secret(self, secret_data: bytes) -> str: + """ + Writes the secret to the secret storage service. + + Args: + secret_data: The secret data to write. + + Returns: + The path that the secret was written to. + """ + client = self.aws_credentials.get_secrets_manager_client() + response = await run_sync_in_worker_thread( + client.put_secret_value, SecretId=self.secret_name, SecretBinary=secret_data + ) + return response["name"] + + async def update_secret( + self, secret_data: bytes, description: Optional[str] = None + ) -> str: + """ + Updates the secret to the secret storage service. + + Args: + secret_data: The secret data to update. + + Returns: + The path that the secret was updated to. + """ + client = self.aws_credentials.get_secrets_manager_client() + response = await run_sync_in_worker_thread( + client.update_secret, + SecretId=self.secret_name, + SecretBinary=secret_data, + Description=description, + ) + return response["name"] + + async def delete_secret( + self, + recovery_window_in_days: Optional[int] = None, + force_delete_without_recovery: bool = False, + ) -> str: + """ + Deletes the secret from the secret storage service. + + Args: + recovery_window_in_days: The number of days to wait before permanently + deleting the secret. Must be between 7 and 30 days. + force_delete_without_recovery: If True, the secret will be deleted + immediately without a recovery window. + + Returns: + The path that the secret was deleted from. + """ + if force_delete_without_recovery and recovery_window_in_days: + raise ValueError( + "Cannot specify recovery window and force delete without recovery." + ) + elif 7 <= recovery_window_in_days <= 30: + raise ValueError("Recovery window must be between 7 and 30 days.") + + client = self.aws_credentials.get_secrets_manager_client() + await run_sync_in_worker_thread( + client.delete_secret, + SecretId=self.secret_name, + RecoveryWindowInDays=recovery_window_in_days, + ForceDeleteWithoutRecovery=force_delete_without_recovery, + ) + return self.secret_name From f7778f7d099c72f70b776312700d50f414114eb3 Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Wed, 18 Jan 2023 16:59:10 -0800 Subject: [PATCH 3/9] Add secret manager block --- README.md | 16 +++++ prefect_aws/s3.py | 2 +- prefect_aws/secrets_manager.py | 121 ++++++++++++++++++++++----------- tests/test_secrets_manager.py | 38 +++++++++++ 4 files changed, 136 insertions(+), 41 deletions(-) diff --git a/README.md b/README.md index 4586dc21..4dbcd6af 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,22 @@ def example_flow(): example_flow() ``` +#### Write, read, and delete secret from AWS Secrets Manager +```python +from prefect import flow +from prefect_aws import AwsCredentials, SecretsManager + +@flow +def example_flow(): + secrets_manager = SecretsManager.load("my-block") + secrets_manager.write_secret("my-secret-value") + secret = secrets_manager.read_secret() + print(secret) + secrets_manager.delete_secret() + +example_flow() +``` + #### Use `with_options` to customize options on any existing task or flow ```python diff --git a/prefect_aws/s3.py b/prefect_aws/s3.py index 44f92956..875aed74 100644 --- a/prefect_aws/s3.py +++ b/prefect_aws/s3.py @@ -963,7 +963,7 @@ async def upload_from_path( @sync_compatible async def upload_from_file_object( - self, from_file_object: BinaryIO, to_path: str, **upload_kwargs + self, from_file_object: BinaryIO, to_path: str, **upload_kwargs: Dict[str, Any] ) -> str: """ Uploads an object to the S3 bucket from a file-like object, diff --git a/prefect_aws/secrets_manager.py b/prefect_aws/secrets_manager.py index bfff2d53..4c9b0537 100644 --- a/prefect_aws/secrets_manager.py +++ b/prefect_aws/secrets_manager.py @@ -1,10 +1,10 @@ """Tasks for interacting with AWS Secrets Manager""" -from typing import Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Union from botocore.exceptions import ClientError from prefect import get_run_logger, task from prefect.blocks.abstract import SecretBlock -from prefect.utilities.asyncutils import run_sync_in_worker_thread +from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible from pydantic import Field from prefect_aws import AwsCredentials, MinIOCredentials @@ -362,9 +362,13 @@ class SecretsManager(SecretBlock): aws_credentials: Union[AwsCredentials, MinIOCredentials] secret_name: str = Field(default=..., description="The name of the secret.") + @sync_compatible async def read_secret( - self, version_id: str = None, version_stage: str = None - ) -> Union[str, bytes]: + self, + version_id: str = None, + version_stage: str = None, + **read_kwargs: Dict[str, Any], + ) -> bytes: """ Reads the secret from the secret storage service. @@ -373,61 +377,83 @@ async def read_secret( version will be read. version_stage: The version stage of the secret to read. If not provided, the latest version will be read. + read_kwargs: Additional keyword arguments to pass to the + `get_secret_value` method of the boto3 client. Returns: The secret data. + + Examples: + Reads a secret. + ```python + secrets_manager = SecretsManager.load("MY_BLOCK") + secrets_manager.read_secret() + ``` """ client = self.aws_credentials.get_secrets_manager_client() + if version_id is not None: + read_kwargs["VersionId"] = version_id + if version_stage is not None: + read_kwargs["VersionStage"] = version_stage response = await run_sync_in_worker_thread( - client.get_secret_value, - SecretId=self.secret_name, - VersionId=version_id, - version_stage=version_stage, + client.get_secret_value, SecretId=self.secret_name, **read_kwargs ) - secret = response.get("SecretString") or response.get("SecretBinary") + secret = response["SecretBinary"] + arn = response["ARN"] + self.logger.info(f"The secret {arn!r} data was successfully read.") return secret - async def write_secret(self, secret_data: bytes) -> str: + @sync_compatible + async def write_secret( + self, secret_data: bytes, **put_or_create_secret_kwargs: Dict[str, Any] + ) -> str: """ - Writes the secret to the secret storage service. + Writes the secret to the secret storage service as a SecretBinary; + if it doesn't exist, it will be created. Args: secret_data: The secret data to write. + **put_or_create_secret_kwargs: Additional keyword arguments to pass to + put_secret_value or create_secret method of the boto3 client. Returns: The path that the secret was written to. - """ - client = self.aws_credentials.get_secrets_manager_client() - response = await run_sync_in_worker_thread( - client.put_secret_value, SecretId=self.secret_name, SecretBinary=secret_data - ) - return response["name"] - - async def update_secret( - self, secret_data: bytes, description: Optional[str] = None - ) -> str: - """ - Updates the secret to the secret storage service. - - Args: - secret_data: The secret data to update. - Returns: - The path that the secret was updated to. + Examples: + Write some secret data. + ```python + secrets_manager = SecretsManager.load("MY_BLOCK") + secrets_manager.write_secret(b"my_secret_data") + ``` """ client = self.aws_credentials.get_secrets_manager_client() - response = await run_sync_in_worker_thread( - client.update_secret, - SecretId=self.secret_name, - SecretBinary=secret_data, - Description=description, - ) - return response["name"] + try: + response = await run_sync_in_worker_thread( + client.put_secret_value, + SecretId=self.secret_name, + SecretBinary=secret_data, + **put_or_create_secret_kwargs, + ) + except client.exceptions.ResourceNotFoundException: + self.logger.info( + f"The secret {self.secret_name!r} does not exist yet, creating it now." + ) + response = await run_sync_in_worker_thread( + client.create_secret, + Name=self.secret_name, + SecretBinary=secret_data, + **put_or_create_secret_kwargs, + ) + arn = response["ARN"] + self.logger.info(f"The secret data was written successfully to {arn!r}.") + return arn + @sync_compatible async def delete_secret( self, - recovery_window_in_days: Optional[int] = None, + recovery_window_in_days: int = 30, force_delete_without_recovery: bool = False, + **delete_kwargs: Dict[str, Any], ) -> str: """ Deletes the secret from the secret storage service. @@ -437,22 +463,37 @@ async def delete_secret( deleting the secret. Must be between 7 and 30 days. force_delete_without_recovery: If True, the secret will be deleted immediately without a recovery window. + **delete_kwargs: Additional keyword arguments to pass to the + delete_secret method of the boto3 client. Returns: The path that the secret was deleted from. + + Examples: + Deletes the secret with a recovery window of 15 days. + ```python + secrets_manager = SecretsManager.load("MY_BLOCK") + secrets_manager.delete_secret(recovery_window_in_days=15) + ``` """ if force_delete_without_recovery and recovery_window_in_days: raise ValueError( "Cannot specify recovery window and force delete without recovery." ) - elif 7 <= recovery_window_in_days <= 30: - raise ValueError("Recovery window must be between 7 and 30 days.") + elif not (7 <= recovery_window_in_days <= 30): + raise ValueError( + f"Recovery window must be between 7 and 30 days, got " + f"{recovery_window_in_days}." + ) client = self.aws_credentials.get_secrets_manager_client() - await run_sync_in_worker_thread( + response = await run_sync_in_worker_thread( client.delete_secret, SecretId=self.secret_name, RecoveryWindowInDays=recovery_window_in_days, ForceDeleteWithoutRecovery=force_delete_without_recovery, + **delete_kwargs, ) - return self.secret_name + arn = response["ARN"] + self.logger.info(f"The secret {arn} was deleted successfully.") + return arn diff --git a/tests/test_secrets_manager.py b/tests/test_secrets_manager.py index af607f15..9ac250b3 100644 --- a/tests/test_secrets_manager.py +++ b/tests/test_secrets_manager.py @@ -6,6 +6,7 @@ from prefect import flow from prefect_aws.secrets_manager import ( + SecretsManager, create_secret, delete_secret, read_secret, @@ -161,3 +162,40 @@ async def test_flow(): ) else: assert deletion_date.date() == datetime.utcnow().date() + + +class TestSecretsManager: + @pytest.fixture + def secrets_manager(self, aws_credentials, secretsmanager_client): + yield SecretsManager(aws_credentials=aws_credentials, secret_name="my-test") + + def test_roundtrip_read_write_delete(self, secrets_manager): + arn = "arn:aws:secretsmanager:us-east-1:123456789012:secret" + assert secrets_manager.write_secret("my-secret").startswith(arn) + assert secrets_manager.read_secret() == b"my-secret" + assert secrets_manager.write_secret("my-updated-secret").startswith(arn) + assert secrets_manager.read_secret() == b"my-updated-secret" + assert secrets_manager.delete_secret().startswith(arn) + + def test_read_secret_version_id(self, secrets_manager: SecretsManager): + client = secrets_manager.aws_credentials.get_secrets_manager_client() + client.create_secret(Name="my-test", SecretBinary="my-secret") + response = client.update_secret( + SecretId="my-test", SecretBinary="my-updated-secret" + ) + assert ( + secrets_manager.read_secret(version_id=response["VersionId"]) + == b"my-updated-secret" + ) + + def test_delete_secret_conflict(self, secrets_manager: SecretsManager): + with pytest.raises(ValueError, match="Cannot specify recovery window"): + secrets_manager.delete_secret( + force_delete_without_recovery=True, recovery_window_in_days=10 + ) + + def test_delete_secret_recovery_window(self, secrets_manager: SecretsManager): + with pytest.raises( + ValueError, match="Recovery window must be between 7 and 30 days" + ): + secrets_manager.delete_secret(recovery_window_in_days=42) From df86f79af869e1dc32b16e9aea4d764324cdff88 Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Wed, 18 Jan 2023 17:01:28 -0800 Subject: [PATCH 4/9] Add changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f00cc02e..8804374d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- `SecretsManager` block with `read_secret`, `write_secret`, and `delete_secret` methods - [#176](https://github.com/PrefectHQ/prefect-aws/pull/176) + ### Changed - Object keys sent in S3 requests use '/' delimiters instead of system default - [#192](https://github.com/PrefectHQ/prefect-aws/pull/192) From 98ea878074409a0dcf9ad0f54cd7c8e1a8a62a9a Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Wed, 18 Jan 2023 17:03:12 -0800 Subject: [PATCH 5/9] Add missing attrs --- prefect_aws/secrets_manager.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/prefect_aws/secrets_manager.py b/prefect_aws/secrets_manager.py index 4c9b0537..8ab7a225 100644 --- a/prefect_aws/secrets_manager.py +++ b/prefect_aws/secrets_manager.py @@ -357,6 +357,10 @@ def example_delete_secret_with_recovery_window(): class SecretsManager(SecretBlock): """ Block that represents a resource that can store and retrieve secrets. + + Attributes: + aws_credentials: The credentials to use for authentication with AWS. + secret_name: The name of the secret. """ aws_credentials: Union[AwsCredentials, MinIOCredentials] From 7ac1e1359448b0df5c692ce57842a9c064a9de1b Mon Sep 17 00:00:00 2001 From: Andrew <15331990+ahuang11@users.noreply.github.com> Date: Fri, 20 Jan 2023 09:43:17 -0800 Subject: [PATCH 6/9] Apply suggestions from code review Co-authored-by: Alexander Streed --- prefect_aws/secrets_manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/prefect_aws/secrets_manager.py b/prefect_aws/secrets_manager.py index 8ab7a225..1927513f 100644 --- a/prefect_aws/secrets_manager.py +++ b/prefect_aws/secrets_manager.py @@ -354,16 +354,16 @@ def example_delete_secret_with_recovery_window(): raise -class SecretsManager(SecretBlock): +class AwsSecret(SecretBlock): """ - Block that represents a resource that can store and retrieve secrets. + Manages a secret in AWS's Secrets Manager. Attributes: aws_credentials: The credentials to use for authentication with AWS. secret_name: The name of the secret. """ - aws_credentials: Union[AwsCredentials, MinIOCredentials] + aws_credentials: AwsCredentials secret_name: str = Field(default=..., description="The name of the secret.") @sync_compatible From 346a8d7d72cecd88068d68bff2d802a5f036635c Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Fri, 20 Jan 2023 09:44:36 -0800 Subject: [PATCH 7/9] Remove get secret client --- prefect_aws/credentials.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/prefect_aws/credentials.py b/prefect_aws/credentials.py index 20015d81..4726da2a 100644 --- a/prefect_aws/credentials.py +++ b/prefect_aws/credentials.py @@ -241,12 +241,3 @@ def get_s3_client(self) -> S3Client: An authenticated S3 client. """ return self.get_client(client_type=ClientType.S3) - - def get_secrets_manager_client(self) -> SecretsManagerClient: - """ - Gets an authenticated Secrets Manager client. - - Returns: - An authenticated Secrets Manager client. - """ - return self.get_client(client_type=ClientType.SECRETS_MANAGER) From ead218736d28c9671df56e290fbd68428f91f4d7 Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Fri, 20 Jan 2023 09:45:13 -0800 Subject: [PATCH 8/9] Remove import --- prefect_aws/secrets_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prefect_aws/secrets_manager.py b/prefect_aws/secrets_manager.py index 1927513f..5bbf8495 100644 --- a/prefect_aws/secrets_manager.py +++ b/prefect_aws/secrets_manager.py @@ -7,7 +7,7 @@ from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible from pydantic import Field -from prefect_aws import AwsCredentials, MinIOCredentials +from prefect_aws import AwsCredentials @task From 25a60960e2da6241eae72466f14380e2109b5d4d Mon Sep 17 00:00:00 2001 From: ahuang11 Date: Fri, 20 Jan 2023 09:49:28 -0800 Subject: [PATCH 9/9] Fix tests --- tests/test_secrets_manager.py | 36 +++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/tests/test_secrets_manager.py b/tests/test_secrets_manager.py index 9ac250b3..03d0a08b 100644 --- a/tests/test_secrets_manager.py +++ b/tests/test_secrets_manager.py @@ -6,7 +6,7 @@ from prefect import flow from prefect_aws.secrets_manager import ( - SecretsManager, + AwsSecret, create_secret, delete_secret, read_secret, @@ -164,38 +164,38 @@ async def test_flow(): assert deletion_date.date() == datetime.utcnow().date() -class TestSecretsManager: +class TestAwsSecret: @pytest.fixture - def secrets_manager(self, aws_credentials, secretsmanager_client): - yield SecretsManager(aws_credentials=aws_credentials, secret_name="my-test") + def aws_secret(self, aws_credentials, secretsmanager_client): + yield AwsSecret(aws_credentials=aws_credentials, secret_name="my-test") - def test_roundtrip_read_write_delete(self, secrets_manager): + def test_roundtrip_read_write_delete(self, aws_secret): arn = "arn:aws:secretsmanager:us-east-1:123456789012:secret" - assert secrets_manager.write_secret("my-secret").startswith(arn) - assert secrets_manager.read_secret() == b"my-secret" - assert secrets_manager.write_secret("my-updated-secret").startswith(arn) - assert secrets_manager.read_secret() == b"my-updated-secret" - assert secrets_manager.delete_secret().startswith(arn) - - def test_read_secret_version_id(self, secrets_manager: SecretsManager): - client = secrets_manager.aws_credentials.get_secrets_manager_client() + assert aws_secret.write_secret("my-secret").startswith(arn) + assert aws_secret.read_secret() == b"my-secret" + assert aws_secret.write_secret("my-updated-secret").startswith(arn) + assert aws_secret.read_secret() == b"my-updated-secret" + assert aws_secret.delete_secret().startswith(arn) + + def test_read_secret_version_id(self, aws_secret: AwsSecret): + client = aws_secret.aws_credentials.get_secrets_manager_client() client.create_secret(Name="my-test", SecretBinary="my-secret") response = client.update_secret( SecretId="my-test", SecretBinary="my-updated-secret" ) assert ( - secrets_manager.read_secret(version_id=response["VersionId"]) + aws_secret.read_secret(version_id=response["VersionId"]) == b"my-updated-secret" ) - def test_delete_secret_conflict(self, secrets_manager: SecretsManager): + def test_delete_secret_conflict(self, aws_secret: AwsSecret): with pytest.raises(ValueError, match="Cannot specify recovery window"): - secrets_manager.delete_secret( + aws_secret.delete_secret( force_delete_without_recovery=True, recovery_window_in_days=10 ) - def test_delete_secret_recovery_window(self, secrets_manager: SecretsManager): + def test_delete_secret_recovery_window(self, aws_secret: AwsSecret): with pytest.raises( ValueError, match="Recovery window must be between 7 and 30 days" ): - secrets_manager.delete_secret(recovery_window_in_days=42) + aws_secret.delete_secret(recovery_window_in_days=42)