amqp icon indicating copy to clipboard operation
amqp copied to clipboard

*Channel.NotifyClose() not receiving errors on connection close when publishing high volume of data

Open chanpl opened this issue 4 years ago • 2 comments

https://github.com/streadway/amqp/blob/edfb9018d2714e4ec54dbaba37dbfef2bdadf0e4/connection.go#L233

I am working on a reconnect function for my company's project. What I found is that, when I keep publishing large volume of data, NotifyClose() doesn't receive any error on connection close. However, if I add a small delay (10us) between each publish, NotifyClose() works as expected. My guess is that, large amount of "publish" are scheduled in the CPU / go runtime queue before "notify".

To reproduce the error:

  1. Spin up a rabbitmq server (in docker)
  2. Run a publisher script to keep sending message to server, no delay between each send. In the meantime, another goroutine is listening on NotifyClose() and prints out any error.
  3. Restart rabbitmq server (docker restart ...)

Here is what *Channel.Puhlish() returns:

2019/10/15 16:40:48 write tcp 127.0.0.1:37772->127.0.0.1:5672: write: broken pipe
2019/10/15 16:40:48 write tcp 127.0.0.1:37772->127.0.0.1:5672: write: broken pipe
2019/10/15 16:40:48 write tcp 127.0.0.1:37772->127.0.0.1:5672: write: broken pipe
2019/10/15 16:40:48 write tcp 127.0.0.1:37772->127.0.0.1:5672: write: broken pipe
2019/10/15 16:40:48 Exception (504) Reason: "channel/connection is not open"
2019/10/15 16:40:48 Exception (504) Reason: "channel/connection is not open"
2019/10/15 16:40:48 Exception (504) Reason: "channel/connection is not open"
2019/10/15 16:40:48 Exception (504) Reason: "channel/connection is not open"
2019/10/15 16:40:48 Exception (504) Reason: "channel/connection is not open"

I want to confirm my assumption, and see if adding a small delay (10us) between each publish is a correct and robust solution. Thanks for the help in advance.

chanpl avatar Oct 17 '19 02:10 chanpl

You need to check for errors when you call Channel.Publish().

lukebakken avatar Oct 17 '19 14:10 lukebakken

You need to check for errors when you call Channel.Publish().

Thanks for your suggestion, the output that I quoted from my question is error returned by Channel.Publishing(). However, Channel.Publishing() is an async operation, returning nil from the method doesn't mean server received message successfully. Message from Channel.NotifyClose() comes even late after a bunch of delivery error.

Switching send from async to sync by using Channel.NotifyPublish() is much slower...

Any helpful recommendation will be appreciated.

chanpl avatar Oct 18 '19 03:10 chanpl