symfony-messenger
symfony-messenger copied to clipboard
Architectural Improvements for SNS-SQS-Lambda Integration with Symfony Messenger
We have implemented a system where an SNS Topic distributes messages to multiple SQS Queues, each backed by a dedicated Lambda function. The primary reason for this setup is to assign a unique $transportName to each Lambda through environment variables, which is necessary for the Symfony Messenger-based SqsConsumer
class from your library.
Current Architecture SNS Topic: Distributes messages to multiple SQS Queues and their corresponding Dead Letter Queues (DLQs). SQS Queues: Each queue has a dedicated Lambda function. Lambda Functions: Created for each SQS Queue to process messages individually, with a unique $transportName set via environment variables.
resource "aws_lambda_function" "user_account_send_mail" { // Lambda function configuration... environment { variables = { // Environment variables... AWS_SUBSCRIBER_TRANSPORT_NAME = "async_${each.key}" } } // Additional configuration... }
Currently, each message handler is linked to a specific transport, as indicated by annotations like
#[AsMessageHandler(fromTransport: 'async_subscriber_send_mail_user_changed')]
or
#[AsMessageHandler(fromTransport: 'async_subscriber_backup_user_changed')]
(Both message handlers consumes the same event)
This setup leads to limitations, such as the inability to process the same event by multiple handlers without duplicating messages across different SQS Queues.
Is there a way to eliminate the need for separate transports ($transportName) https://github.com/brefphp/symfony-messenger/blob/master/src/Service/Sqs/SqsConsumer.php#L38 and deduce this information dynamically, perhaps from the messenger.yaml configuration or the SQS event payload?
I'm asking because I want to get rid of many lambdas for each sqs queue and just use one lambda. Maybe you know a different way (if it's not possible to get rid of the $transportName
) How I can configure my lambda dynamically? :)
messenger.yaml
framework:
messenger:
transports:
async_event_user_changed: '%env(MESSENGER_EVENT_USER_CHANGED_TRANSPORT_DSN)%'
async_subscriber_send_mail_user_removed: '%env(MESSENGER_SUBSCRIBER_SEND_MAIL_USER_REMOVED_TRANSPORT_DSN)%'
async_subscriber_backup_user_changed: '%env(MESSENGER_SUBSCRIBER_BACKUP_USER_CHANGED_TRANSPORT_DSN)%'
routing:
'App\Event\UserChangedEvent': async_event_user_changed
Currently, each message handler is linked to a specific transport
I'm very curious to know how you came up with this solution.
I'm very curious to know how you came up with this solution.
I spend a lot of time in finding out how I can implement a event driven architecture with SNS in combination with SQS. After a couple of tries I came to this solution. It was helpful to have specific queues for specific handlers, to handle dead letter queue entries one-by-one and not to retry already handled messages. Imagine the case that you have an event which has 2 handlers and one of them failed. This event goes to the daed-letter queue. Once I'm retrying this event, it will try to handle it with both handlers again. Just in case you have a specific question, just let me know. :)
Ok I got it thanks for your explanation !
It looks like we've chosen 2 totally different solutions, so I'm still very curious about what others achieve ;)