watermill
watermill copied to clipboard
[watermill-amqp] Implement Worker Pool for Concurrent Message Processing
Feature request
Description
I would like to propose an enhancement to the Watermill library to improve message processing efficiency by implementing a worker pool for concurrent message processing. Currently, the ProcessMessages function processes messages sequentially, which can lead to delays when processing time is high or when the output channel is not consumed quickly enough.
Example use case
In scenarios where a high volume of messages is being processed, such as in event-driven architectures or real-time data processing systems, the current blocking behavior can create bottlenecks. By implementing a worker pool, multiple messages can be processed simultaneously, significantly improving throughput and reducing latency.
How it can look like in code
func (s *subscription) ProcessMessages(ctx context.Context) {
amqpMsgs, err := s.createConsumer(s.queueName, s.channel)
if err != nil {
s.logger.Error("Failed to start consuming messages", err, s.logFields)
return
}
out := make(chan *message.Message, 100) // Buffered channel for output messages and we can use `PrefectCount`
// Start worker pool with a fixed number of workers
for i := 0; i < 10; i++ { // Number of workers
go s.worker(ctx, out)
}
// Consuming loop
for {
select {
case amqpMsg := <-amqpMsgs:
if err := s.processMessage(ctx, amqpMsg, out, s.logFields); err != nil {
// Handle error
}
// Handle other cases (e.g., notifyCloseChannel, closing, etc.)
}
}
}
func (s *subscription) worker(ctx context.Context, out chan *message.Message) {
for msg := range out {
// Process the message
// Acknowledge or nack as needed
}
}