Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add secret manager block #176

Merged
merged 11 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- `SecretsManager` block with `read_secret`, `write_secret`, and `delete_secret` methods - [#176](https://github.com/PrefectHQ/prefect-aws/pull/176)

### Changed

- Object keys sent in S3 requests use '/' delimiters instead of system default - [#192](https://github.com/PrefectHQ/prefect-aws/pull/192)
Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,22 @@ def example_flow():
example_flow()
```

#### Write, read, and delete secret from AWS Secrets Manager
```python
from prefect import flow
from prefect_aws import AwsCredentials, SecretsManager

@flow
def example_flow():
secrets_manager = SecretsManager.load("my-block")
secrets_manager.write_secret("my-secret-value")
secret = secrets_manager.read_secret()
print(secret)
secrets_manager.delete_secret()

example_flow()
```

#### Use `with_options` to customize options on any existing task or flow

```python
Expand Down
19 changes: 19 additions & 0 deletions prefect_aws/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import boto3
from mypy_boto3_s3 import S3Client
from mypy_boto3_secretsmanager import SecretsManagerClient
from prefect.blocks.abstract import CredentialsBlock
from pydantic import Field, SecretStr

Expand Down Expand Up @@ -128,6 +129,15 @@ def get_s3_client(self) -> S3Client:
"""
return self.get_client(client_type=ClientType.S3)

def get_secrets_manager_client(self) -> SecretsManagerClient:
"""
Gets an authenticated Secrets Manager client.

Returns:
An authenticated Secrets Manager client.
"""
return self.get_client(client_type=ClientType.SECRETS_MANAGER)


class MinIOCredentials(CredentialsBlock):
"""
Expand Down Expand Up @@ -231,3 +241,12 @@ def get_s3_client(self) -> S3Client:
An authenticated S3 client.
"""
return self.get_client(client_type=ClientType.S3)

def get_secrets_manager_client(self) -> SecretsManagerClient:
"""
Gets an authenticated Secrets Manager client.

Returns:
An authenticated Secrets Manager client.
"""
return self.get_client(client_type=ClientType.SECRETS_MANAGER)
ahuang11 marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 3 additions & 1 deletion prefect_aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ async def download_object_to_file_object(
Download my_folder/notes.txt object to a BytesIO object.
```python
from io import BytesIO

from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
Expand Down Expand Up @@ -962,7 +963,7 @@ async def upload_from_path(

@sync_compatible
async def upload_from_file_object(
self, from_file_object: BinaryIO, to_path: str, **upload_kwargs
self, from_file_object: BinaryIO, to_path: str, **upload_kwargs: Dict[str, Any]
) -> str:
"""
Uploads an object to the S3 bucket from a file-like object,
Expand All @@ -981,6 +982,7 @@ async def upload_from_file_object(
Upload BytesIO object to my_folder/notes.txt.
```python
from io import BytesIO

from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
Expand Down
157 changes: 154 additions & 3 deletions prefect_aws/secrets_manager.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Tasks for interacting with AWS Secrets Manager"""
from typing import Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union

from botocore.exceptions import ClientError
from prefect import get_run_logger, task
from prefect.utilities.asyncutils import run_sync_in_worker_thread
from prefect.blocks.abstract import SecretBlock
from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible
from pydantic import Field

from prefect_aws import AwsCredentials
from prefect_aws import AwsCredentials, MinIOCredentials


@task
Expand Down Expand Up @@ -350,3 +352,152 @@ def example_delete_secret_with_recovery_window():
except ClientError:
logger.exception("Unable to delete secret %s", secret_name)
raise


class SecretsManager(SecretBlock):
ahuang11 marked this conversation as resolved.
Show resolved Hide resolved
"""
Block that represents a resource that can store and retrieve secrets.
ahuang11 marked this conversation as resolved.
Show resolved Hide resolved

Attributes:
aws_credentials: The credentials to use for authentication with AWS.
secret_name: The name of the secret.
"""

aws_credentials: Union[AwsCredentials, MinIOCredentials]
ahuang11 marked this conversation as resolved.
Show resolved Hide resolved
secret_name: str = Field(default=..., description="The name of the secret.")

@sync_compatible
async def read_secret(
self,
version_id: str = None,
version_stage: str = None,
**read_kwargs: Dict[str, Any],
) -> bytes:
"""
Reads the secret from the secret storage service.

Args:
version_id: The version of the secret to read. If not provided, the latest
version will be read.
version_stage: The version stage of the secret to read. If not provided,
the latest version will be read.
read_kwargs: Additional keyword arguments to pass to the
`get_secret_value` method of the boto3 client.

Returns:
The secret data.

Examples:
Reads a secret.
```python
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.read_secret()
```
"""
client = self.aws_credentials.get_secrets_manager_client()
if version_id is not None:
read_kwargs["VersionId"] = version_id
if version_stage is not None:
read_kwargs["VersionStage"] = version_stage
response = await run_sync_in_worker_thread(
client.get_secret_value, SecretId=self.secret_name, **read_kwargs
)
secret = response["SecretBinary"]
arn = response["ARN"]
self.logger.info(f"The secret {arn!r} data was successfully read.")
return secret

@sync_compatible
async def write_secret(
self, secret_data: bytes, **put_or_create_secret_kwargs: Dict[str, Any]
) -> str:
"""
Writes the secret to the secret storage service as a SecretBinary;
if it doesn't exist, it will be created.

Args:
secret_data: The secret data to write.
**put_or_create_secret_kwargs: Additional keyword arguments to pass to
put_secret_value or create_secret method of the boto3 client.

Returns:
The path that the secret was written to.

Examples:
Write some secret data.
```python
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.write_secret(b"my_secret_data")
```
"""
client = self.aws_credentials.get_secrets_manager_client()
try:
response = await run_sync_in_worker_thread(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does run_sync_in_worker_thread work outside of a flow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it works:

    call = partial(__fn, *args, **kwargs)
    return await anyio.to_thread.run_sync(
        call, cancellable=True, limiter=get_thread_limiter()
    )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import anyio
def test():
    print("Hello")
await anyio.to_thread.run_sync(test, cancellable=True)

client.put_secret_value,
SecretId=self.secret_name,
SecretBinary=secret_data,
**put_or_create_secret_kwargs,
)
except client.exceptions.ResourceNotFoundException:
self.logger.info(
f"The secret {self.secret_name!r} does not exist yet, creating it now."
)
response = await run_sync_in_worker_thread(
client.create_secret,
Name=self.secret_name,
SecretBinary=secret_data,
**put_or_create_secret_kwargs,
)
arn = response["ARN"]
self.logger.info(f"The secret data was written successfully to {arn!r}.")
return arn

@sync_compatible
async def delete_secret(
self,
recovery_window_in_days: int = 30,
force_delete_without_recovery: bool = False,
**delete_kwargs: Dict[str, Any],
) -> str:
"""
Deletes the secret from the secret storage service.

Args:
recovery_window_in_days: The number of days to wait before permanently
deleting the secret. Must be between 7 and 30 days.
force_delete_without_recovery: If True, the secret will be deleted
immediately without a recovery window.
**delete_kwargs: Additional keyword arguments to pass to the
delete_secret method of the boto3 client.

Returns:
The path that the secret was deleted from.

Examples:
Deletes the secret with a recovery window of 15 days.
```python
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.delete_secret(recovery_window_in_days=15)
```
"""
if force_delete_without_recovery and recovery_window_in_days:
raise ValueError(
"Cannot specify recovery window and force delete without recovery."
)
elif not (7 <= recovery_window_in_days <= 30):
raise ValueError(
f"Recovery window must be between 7 and 30 days, got "
f"{recovery_window_in_days}."
)

client = self.aws_credentials.get_secrets_manager_client()
response = await run_sync_in_worker_thread(
client.delete_secret,
SecretId=self.secret_name,
RecoveryWindowInDays=recovery_window_in_days,
ForceDeleteWithoutRecovery=force_delete_without_recovery,
**delete_kwargs,
)
arn = response["ARN"]
self.logger.info(f"The secret {arn} was deleted successfully.")
return arn
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ boto3>=1.24.53
botocore>=1.27.53
prefect>=2.7
mypy_boto3_s3>=1.24.94
mypy_boto3_secretsmanager>=1.26.49
38 changes: 38 additions & 0 deletions tests/test_secrets_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from prefect import flow

from prefect_aws.secrets_manager import (
SecretsManager,
create_secret,
delete_secret,
read_secret,
Expand Down Expand Up @@ -161,3 +162,40 @@ async def test_flow():
)
else:
assert deletion_date.date() == datetime.utcnow().date()


class TestSecretsManager:
@pytest.fixture
def secrets_manager(self, aws_credentials, secretsmanager_client):
yield SecretsManager(aws_credentials=aws_credentials, secret_name="my-test")

def test_roundtrip_read_write_delete(self, secrets_manager):
arn = "arn:aws:secretsmanager:us-east-1:123456789012:secret"
assert secrets_manager.write_secret("my-secret").startswith(arn)
assert secrets_manager.read_secret() == b"my-secret"
assert secrets_manager.write_secret("my-updated-secret").startswith(arn)
assert secrets_manager.read_secret() == b"my-updated-secret"
assert secrets_manager.delete_secret().startswith(arn)

def test_read_secret_version_id(self, secrets_manager: SecretsManager):
client = secrets_manager.aws_credentials.get_secrets_manager_client()
client.create_secret(Name="my-test", SecretBinary="my-secret")
response = client.update_secret(
SecretId="my-test", SecretBinary="my-updated-secret"
)
assert (
secrets_manager.read_secret(version_id=response["VersionId"])
== b"my-updated-secret"
)

def test_delete_secret_conflict(self, secrets_manager: SecretsManager):
with pytest.raises(ValueError, match="Cannot specify recovery window"):
secrets_manager.delete_secret(
force_delete_without_recovery=True, recovery_window_in_days=10
)

def test_delete_secret_recovery_window(self, secrets_manager: SecretsManager):
with pytest.raises(
ValueError, match="Recovery window must be between 7 and 30 days"
):
secrets_manager.delete_secret(recovery_window_in_days=42)