Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add secret manager block #176

Merged
merged 11 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- `SecretsManager` block with `read_secret`, `write_secret`, and `delete_secret` methods - [#176](https://github.com/PrefectHQ/prefect-aws/pull/176)

### Changed

- Object keys sent in S3 requests use '/' delimiters instead of system default - [#192](https://github.com/PrefectHQ/prefect-aws/pull/192)
Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,22 @@ def example_flow():
example_flow()
```

#### Write, read, and delete secret from AWS Secrets Manager
```python
from prefect import flow
from prefect_aws import AwsCredentials, SecretsManager

@flow
def example_flow():
secrets_manager = SecretsManager.load("my-block")
secrets_manager.write_secret("my-secret-value")
secret = secrets_manager.read_secret()
print(secret)
secrets_manager.delete_secret()

example_flow()
```

#### Use `with_options` to customize options on any existing task or flow

```python
Expand Down
10 changes: 10 additions & 0 deletions prefect_aws/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import boto3
from mypy_boto3_s3 import S3Client
from mypy_boto3_secretsmanager import SecretsManagerClient
from prefect.blocks.abstract import CredentialsBlock
from pydantic import Field, SecretStr

Expand Down Expand Up @@ -128,6 +129,15 @@ def get_s3_client(self) -> S3Client:
"""
return self.get_client(client_type=ClientType.S3)

def get_secrets_manager_client(self) -> SecretsManagerClient:
"""
Gets an authenticated Secrets Manager client.

Returns:
An authenticated Secrets Manager client.
"""
return self.get_client(client_type=ClientType.SECRETS_MANAGER)


class MinIOCredentials(CredentialsBlock):
"""
Expand Down
4 changes: 3 additions & 1 deletion prefect_aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ async def download_object_to_file_object(
Download my_folder/notes.txt object to a BytesIO object.
```python
from io import BytesIO

from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
Expand Down Expand Up @@ -962,7 +963,7 @@ async def upload_from_path(

@sync_compatible
async def upload_from_file_object(
self, from_file_object: BinaryIO, to_path: str, **upload_kwargs
self, from_file_object: BinaryIO, to_path: str, **upload_kwargs: Dict[str, Any]
) -> str:
"""
Uploads an object to the S3 bucket from a file-like object,
Expand All @@ -981,6 +982,7 @@ async def upload_from_file_object(
Upload BytesIO object to my_folder/notes.txt.
```python
from io import BytesIO

from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
Expand Down
155 changes: 153 additions & 2 deletions prefect_aws/secrets_manager.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Tasks for interacting with AWS Secrets Manager"""
from typing import Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union

from botocore.exceptions import ClientError
from prefect import get_run_logger, task
from prefect.utilities.asyncutils import run_sync_in_worker_thread
from prefect.blocks.abstract import SecretBlock
from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible
from pydantic import Field

from prefect_aws import AwsCredentials

Expand Down Expand Up @@ -350,3 +352,152 @@ def example_delete_secret_with_recovery_window():
except ClientError:
logger.exception("Unable to delete secret %s", secret_name)
raise


class AwsSecret(SecretBlock):
"""
Manages a secret in AWS's Secrets Manager.

Attributes:
aws_credentials: The credentials to use for authentication with AWS.
secret_name: The name of the secret.
"""

aws_credentials: AwsCredentials
secret_name: str = Field(default=..., description="The name of the secret.")

@sync_compatible
async def read_secret(
self,
version_id: str = None,
version_stage: str = None,
**read_kwargs: Dict[str, Any],
) -> bytes:
"""
Reads the secret from the secret storage service.

Args:
version_id: The version of the secret to read. If not provided, the latest
version will be read.
version_stage: The version stage of the secret to read. If not provided,
the latest version will be read.
read_kwargs: Additional keyword arguments to pass to the
`get_secret_value` method of the boto3 client.

Returns:
The secret data.

Examples:
Reads a secret.
```python
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.read_secret()
```
"""
client = self.aws_credentials.get_secrets_manager_client()
if version_id is not None:
read_kwargs["VersionId"] = version_id
if version_stage is not None:
read_kwargs["VersionStage"] = version_stage
response = await run_sync_in_worker_thread(
client.get_secret_value, SecretId=self.secret_name, **read_kwargs
)
secret = response["SecretBinary"]
arn = response["ARN"]
self.logger.info(f"The secret {arn!r} data was successfully read.")
return secret

@sync_compatible
async def write_secret(
self, secret_data: bytes, **put_or_create_secret_kwargs: Dict[str, Any]
) -> str:
"""
Writes the secret to the secret storage service as a SecretBinary;
if it doesn't exist, it will be created.

Args:
secret_data: The secret data to write.
**put_or_create_secret_kwargs: Additional keyword arguments to pass to
put_secret_value or create_secret method of the boto3 client.

Returns:
The path that the secret was written to.

Examples:
Write some secret data.
```python
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.write_secret(b"my_secret_data")
```
"""
client = self.aws_credentials.get_secrets_manager_client()
try:
response = await run_sync_in_worker_thread(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does run_sync_in_worker_thread work outside of a flow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it works:

    call = partial(__fn, *args, **kwargs)
    return await anyio.to_thread.run_sync(
        call, cancellable=True, limiter=get_thread_limiter()
    )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import anyio
def test():
    print("Hello")
await anyio.to_thread.run_sync(test, cancellable=True)

client.put_secret_value,
SecretId=self.secret_name,
SecretBinary=secret_data,
**put_or_create_secret_kwargs,
)
except client.exceptions.ResourceNotFoundException:
self.logger.info(
f"The secret {self.secret_name!r} does not exist yet, creating it now."
)
response = await run_sync_in_worker_thread(
client.create_secret,
Name=self.secret_name,
SecretBinary=secret_data,
**put_or_create_secret_kwargs,
)
arn = response["ARN"]
self.logger.info(f"The secret data was written successfully to {arn!r}.")
return arn

@sync_compatible
async def delete_secret(
self,
recovery_window_in_days: int = 30,
force_delete_without_recovery: bool = False,
**delete_kwargs: Dict[str, Any],
) -> str:
"""
Deletes the secret from the secret storage service.

Args:
recovery_window_in_days: The number of days to wait before permanently
deleting the secret. Must be between 7 and 30 days.
force_delete_without_recovery: If True, the secret will be deleted
immediately without a recovery window.
**delete_kwargs: Additional keyword arguments to pass to the
delete_secret method of the boto3 client.

Returns:
The path that the secret was deleted from.

Examples:
Deletes the secret with a recovery window of 15 days.
```python
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.delete_secret(recovery_window_in_days=15)
```
"""
if force_delete_without_recovery and recovery_window_in_days:
raise ValueError(
"Cannot specify recovery window and force delete without recovery."
)
elif not (7 <= recovery_window_in_days <= 30):
raise ValueError(
f"Recovery window must be between 7 and 30 days, got "
f"{recovery_window_in_days}."
)

client = self.aws_credentials.get_secrets_manager_client()
response = await run_sync_in_worker_thread(
client.delete_secret,
SecretId=self.secret_name,
RecoveryWindowInDays=recovery_window_in_days,
ForceDeleteWithoutRecovery=force_delete_without_recovery,
**delete_kwargs,
)
arn = response["ARN"]
self.logger.info(f"The secret {arn} was deleted successfully.")
return arn
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ boto3>=1.24.53
botocore>=1.27.53
prefect>=2.7
mypy_boto3_s3>=1.24.94
mypy_boto3_secretsmanager>=1.26.49
38 changes: 38 additions & 0 deletions tests/test_secrets_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from prefect import flow

from prefect_aws.secrets_manager import (
AwsSecret,
create_secret,
delete_secret,
read_secret,
Expand Down Expand Up @@ -161,3 +162,40 @@ async def test_flow():
)
else:
assert deletion_date.date() == datetime.utcnow().date()


class TestAwsSecret:
@pytest.fixture
def aws_secret(self, aws_credentials, secretsmanager_client):
yield AwsSecret(aws_credentials=aws_credentials, secret_name="my-test")

def test_roundtrip_read_write_delete(self, aws_secret):
arn = "arn:aws:secretsmanager:us-east-1:123456789012:secret"
assert aws_secret.write_secret("my-secret").startswith(arn)
assert aws_secret.read_secret() == b"my-secret"
assert aws_secret.write_secret("my-updated-secret").startswith(arn)
assert aws_secret.read_secret() == b"my-updated-secret"
assert aws_secret.delete_secret().startswith(arn)

def test_read_secret_version_id(self, aws_secret: AwsSecret):
client = aws_secret.aws_credentials.get_secrets_manager_client()
client.create_secret(Name="my-test", SecretBinary="my-secret")
response = client.update_secret(
SecretId="my-test", SecretBinary="my-updated-secret"
)
assert (
aws_secret.read_secret(version_id=response["VersionId"])
== b"my-updated-secret"
)

def test_delete_secret_conflict(self, aws_secret: AwsSecret):
with pytest.raises(ValueError, match="Cannot specify recovery window"):
aws_secret.delete_secret(
force_delete_without_recovery=True, recovery_window_in_days=10
)

def test_delete_secret_recovery_window(self, aws_secret: AwsSecret):
with pytest.raises(
ValueError, match="Recovery window must be between 7 and 30 days"
):
aws_secret.delete_secret(recovery_window_in_days=42)