go-rabbitmq icon indicating copy to clipboard operation
go-rabbitmq copied to clipboard

Each consumer/ producer has it's own connection?

Open Nikola-Milovic opened this issue 2 years ago • 1 comments

Hello, from my understanding of the code, each consumer/ publisher will have its own connection, which contradicts the recommendation of 1 connection per application. In theory, every consumer/ producer should use the same connection but have their own channel? Is this something that's intentional by design, or is there room for improvement here?

Nikola-Milovic avatar Jun 05 '22 11:06 Nikola-Milovic

It's by design. The consumers and producers are thread safe. So when I use this library, I generally just have one producer and one consumer.

wagslane avatar Jun 05 '22 12:06 wagslane

Hi there, If consumer and publisher are thread safe, why then comment above Close method says that it is not safe to reuse consumet\producer? https://github.com/wagslane/go-rabbitmq/blob/main/publish.go#L211 https://github.com/wagslane/go-rabbitmq/blob/main/consume.go#L131

JFFby avatar Sep 21 '22 18:09 JFFby

There is room for improvement here! However, at the moment to use this library effectively you should be creating new consumers for each queue you want to consume from. A given consumer is thread-safe in that you can distribute the work of that consumer across many goroutines.

wagslane avatar Oct 03 '22 00:10 wagslane

Should this be more prominently called out or actually produce an error? I've also ran into this by calling StartConsuming multiple times to eventually find out only one of them is actually reconnected 🤔

johanneswuerbach avatar Oct 05 '22 09:10 johanneswuerbach

https://pkg.go.dev/github.com/rabbitmq/amqp091-go

It's advisable to use separate connections for Channel.Publish and Channel.Consume so not to have TCP pushback on publishing affect the ability to consume messages, so this parameter is here mostly for completeness.

Note: RabbitMQ prefers to use TCP push back to control flow for all channels on a connection, so under high volume scenarios, it's wise to open separate Connections for publishings and deliveries.

Note: RabbitMQ will rather use TCP pushback on the network connection instead of sending basic.flow. This means that if a single channel is producing too much on the same connection, all channels using that connection will suffer, including acknowledgments from deliveries. Use different Connections if you desire to interleave consumers and producers in the same process to avoid your basic.ack messages from getting rate limited with your basic.publish messages.

Chameleon-m avatar Nov 29 '22 12:11 Chameleon-m

I'm working on a fix for this. I'm thinking the new API will look like this:

conn := NewConnectionManager()
consumer := NewConsumer(conn, ...)

Each consumer/publisher would then share an underlying connection manager.

wagslane avatar Nov 29 '22 21:11 wagslane

Hey @wagslane. I was thinking the same thing. For my use case, We are creating a new subscriber for a particular Server Sent Event connection. Creating a new connection each time is not ideal.

I can help with the new API that can support multiple consumers while maintaining a single connection.

aaqaishtyaq avatar Nov 30 '22 17:11 aaqaishtyaq

This should be resolved now on the main branch. Please check the docs and let me know if I missed something.

wagslane avatar Nov 30 '22 19:11 wagslane

Hey @wagslane, We are still re-using the channel. Ideally, for each consumer, we should create a new channel.

Currently, there are two issues here:

  1. RabbitMQ does not detect that the channel is closed and it will not delete temporary queues until the connection is closed (We are using the same channel here once the connection is made, and only updating it on reconnect).
  2. No way to set Qos for individual consumers as the condition applies to the channel and we are re-using the channel.

How I see is that we need to manage reconnection for multiple channels and update it once and get a new channel for all old channels. Any thoughts?

In this case, As you can see the queue is still up but the consumer is closed (Not the connection). image

aaqaishtyaq avatar Dec 01 '22 14:12 aaqaishtyaq

Derp. You are correct. I did this too quickly one day without thinking it through. I'll make another update when I have some time. This shouldn't change the API too much, mostly internals

wagslane avatar Dec 01 '22 15:12 wagslane

okay, please check the main branch again.

There is now a single "connection manager" and a "channel manager" for each publisher and consumer. When an issue happens, both reconnect independently. The channel depends on a connection, so first the connection reconnects successfully, then all the channels should reconnect, then consumption and publishing resumes.

Let me know if you see other issues with this approach.

wagslane avatar Dec 01 '22 18:12 wagslane

Nice. It fixed the QosPrefetch. The temporary queue is still not getting cleared as the channel is open. Added a PR to close the channel once the consumer is stopped.

Please review the PR. I have tested running multiple temporary consumers and tried to close one of them and still received messages on other consumers. Even the queue is cleared up a few seconds after the consumer stops. Let me know if you need anything or code examples to test the behaviour.

aaqaishtyaq avatar Dec 02 '22 08:12 aaqaishtyaq

Resolved in 0.11.0

wagslane avatar Dec 05 '22 18:12 wagslane

Hi @wagslane , so its safe to reuse Publisher and Consumer now ? I want to init 1 Publisher and reuse in my application

tnm0113 avatar Mar 06 '23 04:03 tnm0113