amqp icon indicating copy to clipboard operation
amqp copied to clipboard

Connection.Close intermittently hangs when connection.NotifyClose channels are not consumed

Open nvx opened this issue 5 years ago • 6 comments

While debugging an intermittent hang when calling Connection.Close() I discovered the following:

When it hangs I can see it waiting on channel send at this line: https://github.com/streadway/amqp/blob/70e15c650864f4fc47f5d3c82ea117285480895d/connection.go#L390

Interestingly, to reach that line it means that Connection.shutdown() has been called with a non-nil error. Connection.Close() calls Connection.shutdown(nil) directly as a defer here https://github.com/streadway/amqp/blob/70e15c650864f4fc47f5d3c82ea117285480895d/connection.go#L324

Noting that it's wrapped in a sync.Once, I'm guessing that there's a race condition where if the server is slow to respond the Connection.shutdown(nil) is called from the defer and no hang occurs, while if the server is sufficiently fast to respond, it's called with the server reply and hang.

The root cause is of course a channel in Connection.closes that is being written to with no consumer. Once tracking down where Connection.closes was being set my mistake became pretty apparent, but the intermittent nature of the fault was interesting to track down.

The documentation for connection.NotifyClose() doesn't mention that failure to consume messages from the channel would result in connection.Close() intermittently hanging, in fact it implies that during a graceful shutdown it should be perfectly safe to stop consuming from the channel "On normal shutdowns, the chan will be closed.".

The obvious solution to fix the issue in my code was to simply make the channel passed to notifyClose have a buffer size of 1, but there remains at the very least a documentation issue, or a race condition in that a sufficiently fast server will attempt to send a connection closed message triggering a Connection.shutdown() call with a non-nil error before Connection.Close() does.

On an unrelated note, I really like this library and thanks for all the hard work all the contributors have put into it!

nvx avatar Sep 17 '18 14:09 nvx

That's a good question. Other clients provide a way to "abandon" a connection (close it and ignore any possible errors) in part because of scenarios like this. How would you suggest it should be handled in this client/idiomatic Go?

michaelklishin avatar Feb 16 '19 21:02 michaelklishin

I haven't seen any examples of close and ignore errors in the standard library, but I have seen examples where a channel buffer of size 1 is used to avoid a hang on write. My understanding is the GC would free it when no longer referenced in such cases.

That's not to say there isn't an idiomatic way of closing and ignoring errors, just that I can't think of one.

nvx avatar Feb 16 '19 22:02 nvx

I'm not sure why that would be a blocker in any way. A library such as this one can decide to provide a way to close a connection (or any other resource) and ignore all errors.

#278 is also relevant here but is not a pre-requisite.

michaelklishin avatar Feb 19 '19 17:02 michaelklishin

I'm seeing this as well, following the consumer example. I'm trying to capture a SIGINT, empty the queue and exit cleanly, but when I call Channel.Cancel() it hangs indefinitely. This doesn't happen with Channel.Close() or Conn.Close().

ajsharp avatar Jul 03 '20 01:07 ajsharp

Retracting above. May have been an issue with my code. Apologies.

ajsharp avatar Jul 04 '20 02:07 ajsharp

@ajsharp might not be far off. We have one flaky test and it is specifically testing the path for cleaning up the channel after a queue is deleted. So not calling Channel.Cancel(), but forcing the server to send it by deleting a queue with an active consumer.

However, this may be more related to #278 as our problem manifests on creating a new channel immediately after the queue is deleted. The channel creation locks here:

goroutine 483 [select]:
github.com/streadway/amqp.(*Channel).call(0xc000319e60, 0xc446e0, 0xc000487c70, 0xc000259880, 0x1, 0x1, 0x1b, 0xb5c2c5)
  /vendor/github.com/streadway/amqp/channel.go:176 +0x149
github.com/streadway/amqp.(*Channel).open(...)
  /vendor/github.com/streadway/amqp/channel.go:165
github.com/streadway/amqp.(*Connection).openChannel(0xc0002d1180, 0xc000482e80, 0xc000180410, 0xc000259950)
  /vendor/github.com/streadway/amqp/connection.go:631 +0xd7
github.com/streadway/amqp.(*Connection).Channel(...)
  /vendor/github.com/streadway/amqp/connection.go:653

HakShak avatar Sep 08 '20 23:09 HakShak