amqp-client icon indicating copy to clipboard operation
amqp-client copied to clipboard

Adding multiple Bindings to a Consumer results in multiple consumer instances

Open thjaeckle opened this issue 9 years ago • 5 comments

We use this great wrapper for AMQP client to consume multiple routingKeys on a single Exhange. For that, we send AddBinding to the Consumer as documented:

consumer ! Record(AddBinding(Binding(StandardExchanges.amqDirect, QueueParameters("my_queue", passive = false, durable = false, exclusive = false, autodelete = true), "my_key_1")))
consumer ! Record(AddBinding(Binding(StandardExchanges.amqDirect, QueueParameters("my_queue", passive = false, durable = false, exclusive = false, autodelete = true), "my_key_2")))

In the AMQP web-interface I now see that for the example snippet above there are 2 "Consumers".

Maybe this is the intended behavior - but what seems strange then is when there are only messages for the routingKey "my_key_1" both Consumers seem to handle this message.

It would be cool if Amqp.Binding could take more than one routingKey. What do you think? Would that be the "correct" approach?

thjaeckle avatar Sep 21 '15 08:09 thjaeckle

Hello, You're right, the library creates a new "virtual" consumer when you add a new binding. I'll check but I'm pretty sure that it is "scalable" from a performance point of view. I'll try and think of a way of handling multiple routing keys (I think that I tried, failed and gave it up because it's not really needed :)) I don't really understand the second part of your message, can you reproduce the problem with a simple example ? Thank you

sstone avatar Sep 21 '15 21:09 sstone

When creating a "ReactiveStreams" "Publisher" and used this as listener Actor in your Consumer I noticed that when I configured a "qos" (prefetch count) of 1 for the ChannelParameters, the listener Actor not only got 1 Message at a time, but up to amount of routingKeys messages.

So the "prefetch count" multiplied with the "virtual" consumers which was not what I expected.

thjaeckle avatar Sep 22 '15 06:09 thjaeckle

Do you have a small, complete code sample ? Thanks

sstone avatar Sep 22 '15 14:09 sstone

To be honest it would be quite complicated to share the original code (corporate restrictions, etc.). But I'll try building a minimal example.

My idea would be to change Amqp.Binding to have the routingKeys as varargs (routingKeys: String*) and in the Consumer bind for all declared routingKeys:

binding.routingKeys.foreach(key => c.queueBind(queueName, binding.exchange.name, key))

This way you only get 1 consumer which handles an arbitrary amount of routingKeys.

What do you think?

thjaeckle avatar Sep 24 '15 15:09 thjaeckle

Sorry I did not respond in such a long time.

I created a pull request https://github.com/sstone/amqp-client/pull/73 containing a "fix" for this one. That way when using the AddBinding message one can choose if there should be only 1 "virtual consumer" for all the provided routingKeys (by passing in a Seq as the varargs argument) or create one "virtual consumer" for each routing key (as the previous behavior is).

The PR is binary compatible and also the behavior is the same as before. Passing in multiple routingKeys is only an optional enhancement.

With that one can now exactly configure the amount of consumers (or use exactly 1) when working with multiple routing keys.

thjaeckle avatar Mar 30 '16 09:03 thjaeckle