Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch helper methods - AWS SDK helpers #1187

Open
jplock opened this issue Apr 4, 2021 · 5 comments
Open

Batch helper methods - AWS SDK helpers #1187

jplock opened this issue Apr 4, 2021 · 5 comments
Labels
feature-request feature request need-customer-feedback Requires more customers feedback before making or revisiting a decision need-rfc

Comments

@jplock
Copy link

jplock commented Apr 4, 2021

Is your feature request related to a problem? Please describe.

Batch operation helper methods for SQS and DynamoDB

Describe the solution you'd like

Helper methods that will automatically chunk a list into specific sizes compatible with sqs SendMessageBatch and dynamodb BatchWriteItem

Describe alternatives you've considered

I end up rewriting this same functionality in multiple projects.

Additional context

@heitorlessa
Copy link
Contributor

Hey @jplock, thanks for the feature request! Could you share an example of what you have at the moment?

That will help clarifying whether this is best placed in the boto3 SDK, or if a helper method as part of SQS/DynamoDB data classes

Thanks!

@jplock
Copy link
Author

jplock commented Apr 5, 2021

from typing import List, Any

import boto3

# https://stackoverflow.com/a/1751478
def chunks(l, n):
    n = max(1, n)
    return (l[i:i+n] for i in range(0, len(l), n))

DYNAMODB_BATCH_WRITE_ITEM_LIMIT = 25
SQS_SEND_MESSAGE_BATCH_LIMIT = 10

def dynamodb_batch_write_item(table_name: str, items: List[Any]) -> None:
    dynamodb = boto3.client("dynamodb")

    for chunk in chunks(items, DYNAMODB_BATCH_WRITE_ITEM_LIMIT):
        if not chunk:
            continue
    
        params = {}
        params[table_name] = chunk

        dynamodb.batch_write_item(**params)

def sqs_send_message_batch(queue_url: str, messages: List[Any]) -> None:
    sqs = boto3.client("sqs")

    for chunk in chunks(messages, SQS_SEND_MESSAGE_BATCH_LIMIT):
        if not chunk:
            continue
    
        params = {
            "QueueUrl": queue_url,
            "Entries": chunk
        }

        sqs.send_message_batch(**params)

The DynamoDB Batch Write Item call gets more complicated since it can operate across multiple tables, so I'm not sure on the best interface for that, but hopefully this makes sense. Retry logic would also need to be added into the DynamoDB use-case to handle retrying any items returned in UnprocessedItems.

@heitorlessa heitorlessa transferred this issue from aws-powertools/powertools-lambda-python May 13, 2021
@heitorlessa
Copy link
Contributor

I've had a think about this. I'd love if anyone could create a RFC of a new SDK Helpers utility - There's surely many of this cross cutting pieces that don't make it to the SDK which some might make in time while others don't (like this).

An additional idea for a SDK Helper utility is automatically adjusting timeouts (TCP/HTTP) based on Lambda function duration (context).

There could be more to make this feasible so a RFC would give everyone an opportunity to chime in to make this more widely useful - Tagged as Pending/RFC before we consider it.

@heitorlessa heitorlessa changed the title Batch helper methods Batch helper methods - AWS SDK helpers May 31, 2021
@heitorlessa heitorlessa transferred this issue from aws-powertools/powertools-lambda Apr 28, 2022
@heitorlessa heitorlessa added need-customer-feedback Requires more customers feedback before making or revisiting a decision and removed python labels May 20, 2022
@jplock
Copy link
Author

jplock commented Jul 18, 2022

Here's a more flushed out example of a DynamoDB batch_write_item helper:

dynamodb = boto3.client("dynamodb")
max_retries = 15
min_sleep_time = 1e-2

@tracer.capture_method(capture_response=False)
def batch_write(items: Dict[str, Any], retry_count: int = 0) -> None:
    try:
        response = dynamodb.batch_write_item(RequestItems=items)
        metrics.add_metric(name="BatchWriteSuccess", unit=MetricUnit.Count, value=1)
    except botocore.exceptions.ClientError:
        logger.exception("Failed to write batch items to DynamoDB")
        metrics.add_metric(name="BatchWriteFailed", unit=MetricUnit.Count, value=1)
        raise

    unprocessed_items = response.get("UnprocessedItems", [])
    if unprocessed_items:
        if retry_count > max_retries:
            retry_count = random.randint(1, max_retries)
        sleep_time = min_sleep_time * random.randint(1, 2**retry_count)
        logger.warn(f"{len(unprocessed_items)} unprocessed items, sleeping for {sleep_time} seconds (retry count = {retry_count})")
        time.sleep(sleep_time)
        batch_write(unprocessed_items, retry_count + 1)

@anafalcao
Copy link
Collaborator

Related to aws-powertools/powertools-lambda-typescript#2196

If you are interested in this feature, please leave a thumbs up.

@anafalcao anafalcao added the feature-request feature request label Jan 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-request feature request need-customer-feedback Requires more customers feedback before making or revisiting a decision need-rfc
Projects
Development

No branches or pull requests

3 participants