amqp-client
amqp-client copied to clipboard
Adding multiple Bindings to a Consumer results in multiple consumer instances
We use this great wrapper for AMQP client to consume multiple routingKeys on a single Exhange.
For that, we send AddBinding
to the Consumer
as documented:
consumer ! Record(AddBinding(Binding(StandardExchanges.amqDirect, QueueParameters("my_queue", passive = false, durable = false, exclusive = false, autodelete = true), "my_key_1")))
consumer ! Record(AddBinding(Binding(StandardExchanges.amqDirect, QueueParameters("my_queue", passive = false, durable = false, exclusive = false, autodelete = true), "my_key_2")))
In the AMQP web-interface I now see that for the example snippet above there are 2 "Consumers".
Maybe this is the intended behavior - but what seems strange then is when there are only messages for the routingKey "my_key_1"
both Consumers seem to handle this message.
It would be cool if Amqp.Binding
could take more than one routingKey.
What do you think? Would that be the "correct" approach?
Hello, You're right, the library creates a new "virtual" consumer when you add a new binding. I'll check but I'm pretty sure that it is "scalable" from a performance point of view. I'll try and think of a way of handling multiple routing keys (I think that I tried, failed and gave it up because it's not really needed :)) I don't really understand the second part of your message, can you reproduce the problem with a simple example ? Thank you
When creating a "ReactiveStreams" "Publisher" and used this as listener
Actor in your Consumer
I noticed that when I configured a "qos" (prefetch count) of 1 for the ChannelParameters
, the listener
Actor not only got 1 Message at a time, but up to amount of routingKeys
messages.
So the "prefetch count" multiplied with the "virtual" consumers which was not what I expected.
Do you have a small, complete code sample ? Thanks
To be honest it would be quite complicated to share the original code (corporate restrictions, etc.). But I'll try building a minimal example.
My idea would be to change Amqp.Binding
to have the routingKeys as varargs (routingKeys: String*
) and in the Consumer
bind for all declared routingKeys:
binding.routingKeys.foreach(key => c.queueBind(queueName, binding.exchange.name, key))
This way you only get 1 consumer which handles an arbitrary amount of routingKeys.
What do you think?
Sorry I did not respond in such a long time.
I created a pull request https://github.com/sstone/amqp-client/pull/73 containing a "fix" for this one.
That way when using the AddBinding
message one can choose if there should be only 1 "virtual consumer" for all the provided routingKeys (by passing in a Seq
as the varargs argument) or create one "virtual consumer" for each routing key (as the previous behavior is).
The PR is binary compatible and also the behavior is the same as before. Passing in multiple routingKeys is only an optional enhancement.
With that one can now exactly configure the amount of consumers (or use exactly 1) when working with multiple routing keys.