Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into use-s3-client-more-efficiently
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Jan 3, 2024
2 parents bcf2bed + c0a4f9c commit 4ec6a1e
Show file tree
Hide file tree
Showing 5 changed files with 456 additions and 0 deletions.
6 changes: 6 additions & 0 deletions docs/lambda_function.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
description: Module handling AWS Lambda functions
notes: This documentation page is generated from source file docstrings.
---

::: prefect_aws.lambda_function
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ nav:
- Credentials: credentials.md
- ECS Worker: ecs_worker.md
- ECS: ecs.md
- Lambda: lambda_function.md
- Deployments:
- Steps: deployments/steps.md
- S3: s3.md
Expand Down
2 changes: 2 additions & 0 deletions prefect_aws/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from . import _version
from .credentials import AwsCredentials, MinIOCredentials
from .client_parameters import AwsClientParameters
from .lambda_function import LambdaFunction
from .s3 import S3Bucket
from .ecs import ECSTask
from .secrets_manager import AwsSecret
Expand All @@ -17,6 +18,7 @@
__all__ = [
"AwsCredentials",
"AwsClientParameters",
"LambdaFunction",
"MinIOCredentials",
"S3Bucket",
"ECSTask",
Expand Down
194 changes: 194 additions & 0 deletions prefect_aws/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
"""Integrations with AWS Lambda.
Examples:
Run a lambda function with a payload
```python
LambdaFunction(
function_name="test-function",
aws_credentials=aws_credentials,
).invoke(payload={"foo": "bar"})
```
Specify a version of a lambda function
```python
LambdaFunction(
function_name="test-function",
qualifier="1",
aws_credentials=aws_credentials,
).invoke()
```
Invoke a lambda function asynchronously
```python
LambdaFunction(
function_name="test-function",
aws_credentials=aws_credentials,
).invoke(invocation_type="Event")
```
Invoke a lambda function and return the last 4 KB of logs
```python
LambdaFunction(
function_name="test-function",
aws_credentials=aws_credentials,
).invoke(tail=True)
```
Invoke a lambda function with a client context
```python
LambdaFunction(
function_name="test-function",
aws_credentials=aws_credentials,
).invoke(client_context={"bar": "foo"})
```
"""
import json
from typing import Literal, Optional

from prefect.blocks.core import Block
from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible
from pydantic import VERSION as PYDANTIC_VERSION

if PYDANTIC_VERSION.startswith("2."):
from pydantic.v1 import Field
else:
from pydantic import Field

from prefect_aws.credentials import AwsCredentials


class LambdaFunction(Block):
"""Invoke a Lambda function. This block is part of the prefect-aws
collection. Install prefect-aws with `pip install prefect-aws` to use this
block.
Attributes:
function_name: The name, ARN, or partial ARN of the Lambda function to
run. This must be the name of a function that is already deployed
to AWS Lambda.
qualifier: The version or alias of the Lambda function to use when
invoked. If not specified, the latest (unqualified) version of the
Lambda function will be used.
aws_credentials: The AWS credentials to use to connect to AWS Lambda
with a default factory of AwsCredentials.
"""

_block_type_name = "Lambda Function"
_logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/d74b16fe84ce626345adf235a47008fea2869a60-225x225.png" # noqa
_documentation_url = "https://prefecthq.github.io/prefect-aws/s3/#prefect_aws.lambda_function.LambdaFunction" # noqa

function_name: str = Field(
title="Function Name",
description=(
"The name, ARN, or partial ARN of the Lambda function to run. This"
" must be the name of a function that is already deployed to AWS"
" Lambda."
),
)
qualifier: Optional[str] = Field(
default=None,
title="Qualifier",
description=(
"The version or alias of the Lambda function to use when invoked. "
"If not specified, the latest (unqualified) version of the Lambda "
"function will be used."
),
)
aws_credentials: AwsCredentials = Field(
title="AWS Credentials",
default_factory=AwsCredentials,
description="The AWS credentials to invoke the Lambda with.",
)

class Config:
"""Lambda's pydantic configuration."""

smart_union = True

def _get_lambda_client(self):
"""
Retrieve a boto3 session and Lambda client
"""
boto_session = self.aws_credentials.get_boto3_session()
lambda_client = boto_session.client("lambda")
return lambda_client

@sync_compatible
async def invoke(
self,
payload: dict = None,
invocation_type: Literal[
"RequestResponse", "Event", "DryRun"
] = "RequestResponse",
tail: bool = False,
client_context: Optional[dict] = None,
) -> dict:
"""
[Invoke](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/invoke.html)
the Lambda function with the given payload.
Args:
payload: The payload to send to the Lambda function.
invocation_type: The invocation type of the Lambda function. This
can be one of "RequestResponse", "Event", or "DryRun". Uses
"RequestResponse" by default.
tail: If True, the response will include the base64-encoded last 4
KB of log data produced by the Lambda function.
client_context: The client context to send to the Lambda function.
Limited to 3583 bytes.
Returns:
The response from the Lambda function.
Examples:
```python
from prefect_aws.lambda_function import LambdaFunction
from prefect_aws.credentials import AwsCredentials
credentials = AwsCredentials()
lambda_function = LambdaFunction(
function_name="test_lambda_function",
aws_credentials=credentials,
)
response = lambda_function.invoke(
payload={"foo": "bar"},
invocation_type="RequestResponse",
)
response["Payload"].read()
```
```txt
b'{"foo": "bar"}'
```
"""
# Add invocation arguments
kwargs = dict(FunctionName=self.function_name)

if payload:
kwargs["Payload"] = json.dumps(payload).encode()

# Let boto handle invalid invocation types
kwargs["InvocationType"] = invocation_type

if self.qualifier is not None:
kwargs["Qualifier"] = self.qualifier

if tail:
kwargs["LogType"] = "Tail"

if client_context is not None:
# For some reason this is string, but payload is bytes
kwargs["ClientContext"] = json.dumps(client_context)

# Get client and invoke
lambda_client = await run_sync_in_worker_thread(self._get_lambda_client)
return await run_sync_in_worker_thread(lambda_client.invoke, **kwargs)
Loading

0 comments on commit 4ec6a1e

Please sign in to comment.