amazon-sqs-java-extended-client-lib icon indicating copy to clipboard operation
amazon-sqs-java-extended-client-lib copied to clipboard

`sendMessageBatch` edge case when individual batch entries are close to limit (not over the limit)

Open shotmk opened this issue 1 year ago • 1 comments

There is unhandled edge-case in the current sendMessageBatch implementation:

According to documentation here: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-batch-api-actions.html

The total size of all messages that you send in a single SendMessageBatch call can't exceed 262,144 bytes (256 KiB).

Currently we only measure the individual batch entries, to verify they are under the limit, but this is not enough. We also should measure that entire request is under the threshold.

Few years ago (when there was missing ExtendedSNSClient that is based on AWS SDK v2.x) I've implemented my own ExtendedSNSClient that was very inspired by this ExtendedSQSClient library, and i did resolve the issue described here with the following approach:

@Override
public PublishBatchResponse publishBatch(PublishBatchRequest publishBatchRequest) throws AwsServiceException, SdkClientException {
        if (publishBatchRequest == null) {
            String errorMessage = "publishBatchRequest cannot be null.";
            log.error(errorMessage);
            throw SdkClientException.create(errorMessage);
        }

        PublishBatchRequest.Builder publishBatchRequestBuilder = publishBatchRequest.toBuilder();
        appendUserAgent(publishBatchRequestBuilder);
        publishBatchRequest = publishBatchRequestBuilder.build();

        if (!configuration.isPayloadSupportEnabled()) {
            log.trace("Payload support is disabled. Publishing full message to SNS. S3 is not used.");
            return super.publishBatch(publishBatchRequest);
        }

        List<PublishBatchRequestEntry> batchEntries = new ArrayList<>(publishBatchRequest.publishBatchRequestEntries());
        List<Integer> smallMessageIndexes = new LinkedList<>();
        //move batch entries that individually exceed the threshold limit of SNS to S3 and return total size of resulting request
        long totalSize = moveIndividuallyLargeMessagesToS3(publishBatchRequest.publishBatchRequestEntries(), batchEntries, smallMessageIndexes);
        if (isLarge(totalSize) && !smallMessageIndexes.isEmpty()) {
            //move all messages of the batch to S3 in case tatal size of request exceed the threshold
            moveRemainingMessagesToS3(batchEntries, smallMessageIndexes);
        }

        return super.publishBatch(publishBatchRequest.toBuilder().publishBatchRequestEntries(batchEntries).build());
    }

private long moveIndividuallyLargeMessagesToS3(List<PublishBatchRequestEntry> batchEntriesSrc, List<PublishBatchRequestEntry> batchEntriesTarget, List<Integer> smallMessageIndexes) {
        long totalSize = 0L;
        int index = 0;
        for (PublishBatchRequestEntry entry : batchEntriesSrc) {
            if (StringUtils.isEmpty(entry.message())) {
                String errorMessage = "message cannot be null or empty.";
                log.error(errorMessage);
                throw SdkClientException.create(errorMessage);
            }
            //Check message attributes for ExtendedClient related constraints
            checkMessageAttributes(entry.messageAttributes());
            long entrySize = sizeOf(entry);
            if (configuration.isAlwaysThroughS3() || isLarge(entrySize)) {
                log.trace("Storing publish request entry payload to S3");
                entry = storeMessageInS3(entry);
                entrySize = sizeOf(entry);
            } else {
                smallMessageIndexes.add(index);
            }
            totalSize += entrySize;
            batchEntriesTarget.set(index, entry);
            ++index;
        }
        return totalSize;
    }
    
private void moveRemainingMessagesToS3(List<PublishBatchRequestEntry> batchEntries, List<Integer> smallMessageIndexes) {
        for (Integer index : smallMessageIndexes) {
            batchEntries.set(index, storeMessageInS3(batchEntries.get(index)));
        }
    }

LMK what you think about this? If you agree with the direction i can work on the PR.

shotmk avatar Jun 04 '24 15:06 shotmk