powertools-lambda-python icon indicating copy to clipboard operation
powertools-lambda-python copied to clipboard

Feature request: Batch helper methods - AWS SDK helpers

Open jplock opened this issue 4 years ago • 5 comments

Is your feature request related to a problem? Please describe.

Batch operation helper methods for SQS and DynamoDB

Describe the solution you'd like

Helper methods that will automatically chunk a list into specific sizes compatible with sqs SendMessageBatch and dynamodb BatchWriteItem

Describe alternatives you've considered

I end up rewriting this same functionality in multiple projects.

Additional context

jplock avatar Apr 04 '21 18:04 jplock

Hey @jplock, thanks for the feature request! Could you share an example of what you have at the moment?

That will help clarifying whether this is best placed in the boto3 SDK, or if a helper method as part of SQS/DynamoDB data classes

Thanks!

heitorlessa avatar Apr 04 '21 18:04 heitorlessa

from typing import List, Any

import boto3

# https://stackoverflow.com/a/1751478
def chunks(l, n):
    n = max(1, n)
    return (l[i:i+n] for i in range(0, len(l), n))

DYNAMODB_BATCH_WRITE_ITEM_LIMIT = 25
SQS_SEND_MESSAGE_BATCH_LIMIT = 10

def dynamodb_batch_write_item(table_name: str, items: List[Any]) -> None:
    dynamodb = boto3.client("dynamodb")

    for chunk in chunks(items, DYNAMODB_BATCH_WRITE_ITEM_LIMIT):
        if not chunk:
            continue
    
        params = {}
        params[table_name] = chunk

        dynamodb.batch_write_item(**params)

def sqs_send_message_batch(queue_url: str, messages: List[Any]) -> None:
    sqs = boto3.client("sqs")

    for chunk in chunks(messages, SQS_SEND_MESSAGE_BATCH_LIMIT):
        if not chunk:
            continue
    
        params = {
            "QueueUrl": queue_url,
            "Entries": chunk
        }

        sqs.send_message_batch(**params)

The DynamoDB Batch Write Item call gets more complicated since it can operate across multiple tables, so I'm not sure on the best interface for that, but hopefully this makes sense. Retry logic would also need to be added into the DynamoDB use-case to handle retrying any items returned in UnprocessedItems.

jplock avatar Apr 05 '21 13:04 jplock

I've had a think about this. I'd love if anyone could create a RFC of a new SDK Helpers utility - There's surely many of this cross cutting pieces that don't make it to the SDK which some might make in time while others don't (like this).

An additional idea for a SDK Helper utility is automatically adjusting timeouts (TCP/HTTP) based on Lambda function duration (context).

There could be more to make this feasible so a RFC would give everyone an opportunity to chime in to make this more widely useful - Tagged as Pending/RFC before we consider it.

heitorlessa avatar May 13 '21 13:05 heitorlessa

Here's a more flushed out example of a DynamoDB batch_write_item helper:

dynamodb = boto3.client("dynamodb")
max_retries = 15
min_sleep_time = 1e-2

@tracer.capture_method(capture_response=False)
def batch_write(items: Dict[str, Any], retry_count: int = 0) -> None:
    try:
        response = dynamodb.batch_write_item(RequestItems=items)
        metrics.add_metric(name="BatchWriteSuccess", unit=MetricUnit.Count, value=1)
    except botocore.exceptions.ClientError:
        logger.exception("Failed to write batch items to DynamoDB")
        metrics.add_metric(name="BatchWriteFailed", unit=MetricUnit.Count, value=1)
        raise

    unprocessed_items = response.get("UnprocessedItems", [])
    if unprocessed_items:
        if retry_count > max_retries:
            retry_count = random.randint(1, max_retries)
        sleep_time = min_sleep_time * random.randint(1, 2**retry_count)
        logger.warn(f"{len(unprocessed_items)} unprocessed items, sleeping for {sleep_time} seconds (retry count = {retry_count})")
        time.sleep(sleep_time)
        batch_write(unprocessed_items, retry_count + 1)

jplock avatar Jul 18 '22 02:07 jplock