Feature request: Batch helper methods - AWS SDK helpers
Is your feature request related to a problem? Please describe.
Batch operation helper methods for SQS and DynamoDB
Describe the solution you'd like
Helper methods that will automatically chunk a list into specific sizes compatible with sqs SendMessageBatch and dynamodb BatchWriteItem
Describe alternatives you've considered
I end up rewriting this same functionality in multiple projects.
Additional context
Hey @jplock, thanks for the feature request! Could you share an example of what you have at the moment?
That will help clarifying whether this is best placed in the boto3 SDK, or if a helper method as part of SQS/DynamoDB data classes
Thanks!
from typing import List, Any
import boto3
# https://stackoverflow.com/a/1751478
def chunks(l, n):
n = max(1, n)
return (l[i:i+n] for i in range(0, len(l), n))
DYNAMODB_BATCH_WRITE_ITEM_LIMIT = 25
SQS_SEND_MESSAGE_BATCH_LIMIT = 10
def dynamodb_batch_write_item(table_name: str, items: List[Any]) -> None:
dynamodb = boto3.client("dynamodb")
for chunk in chunks(items, DYNAMODB_BATCH_WRITE_ITEM_LIMIT):
if not chunk:
continue
params = {}
params[table_name] = chunk
dynamodb.batch_write_item(**params)
def sqs_send_message_batch(queue_url: str, messages: List[Any]) -> None:
sqs = boto3.client("sqs")
for chunk in chunks(messages, SQS_SEND_MESSAGE_BATCH_LIMIT):
if not chunk:
continue
params = {
"QueueUrl": queue_url,
"Entries": chunk
}
sqs.send_message_batch(**params)
The DynamoDB Batch Write Item call gets more complicated since it can operate across multiple tables, so I'm not sure on the best interface for that, but hopefully this makes sense. Retry logic would also need to be added into the DynamoDB use-case to handle retrying any items returned in UnprocessedItems.
I've had a think about this. I'd love if anyone could create a RFC of a new SDK Helpers utility - There's surely many of this cross cutting pieces that don't make it to the SDK which some might make in time while others don't (like this).
An additional idea for a SDK Helper utility is automatically adjusting timeouts (TCP/HTTP) based on Lambda function duration (context).
There could be more to make this feasible so a RFC would give everyone an opportunity to chime in to make this more widely useful - Tagged as Pending/RFC before we consider it.
Here's a more flushed out example of a DynamoDB batch_write_item helper:
dynamodb = boto3.client("dynamodb")
max_retries = 15
min_sleep_time = 1e-2
@tracer.capture_method(capture_response=False)
def batch_write(items: Dict[str, Any], retry_count: int = 0) -> None:
try:
response = dynamodb.batch_write_item(RequestItems=items)
metrics.add_metric(name="BatchWriteSuccess", unit=MetricUnit.Count, value=1)
except botocore.exceptions.ClientError:
logger.exception("Failed to write batch items to DynamoDB")
metrics.add_metric(name="BatchWriteFailed", unit=MetricUnit.Count, value=1)
raise
unprocessed_items = response.get("UnprocessedItems", [])
if unprocessed_items:
if retry_count > max_retries:
retry_count = random.randint(1, max_retries)
sleep_time = min_sleep_time * random.randint(1, 2**retry_count)
logger.warn(f"{len(unprocessed_items)} unprocessed items, sleeping for {sleep_time} seconds (retry count = {retry_count})")
time.sleep(sleep_time)
batch_write(unprocessed_items, retry_count + 1)