amqp
amqp copied to clipboard
connection shutdown, channel close data race
I am running tests with go test -v -race -cpu=1,2,4 and, I am not sure but, I think I found the following data race (commit b4f3ceab0337f013208d31348b578d83c0064744):
==================
WARNING: DATA RACE
Read by goroutine 79:
github.com/streadway/amqp.(*Channel).call()
/opt/go/src/github.com/streadway/amqp/channel.go:144 +0x70
github.com/streadway/amqp.(*Channel).Close()
/opt/go/src/github.com/streadway/amqp/channel.go:406 +0x207
[ommited]
Previous write by goroutine 80:
github.com/streadway/amqp.(*Channel).shutdown.func1()
/opt/go/src/github.com/streadway/amqp/channel.go:104 +0x17b
sync.(*Once).Do()
/home/ubuntu/go/src/sync/once.go:44 +0xf6
github.com/streadway/amqp.(*Channel).shutdown()
/opt/go/src/github.com/streadway/amqp/channel.go:134 +0x98
github.com/streadway/amqp.(*Connection).closeChannel()
/opt/go/src/github.com/streadway/amqp/connection.go:580 +0x38
github.com/streadway/amqp.(*Connection).shutdown.func1()
/opt/go/src/github.com/streadway/amqp/connection.go:350 +0x1d5
sync.(*Once).Do()
/home/ubuntu/go/src/sync/once.go:44 +0xf6
github.com/streadway/amqp.(*Connection).shutdown()
/opt/go/src/github.com/streadway/amqp/connection.go:370 +0x93
github.com/streadway/amqp.(*Connection).dispatch0()
/opt/go/src/github.com/streadway/amqp/connection.go:394 +0x4d5
github.com/streadway/amqp.(*Connection).demux()
/opt/go/src/github.com/streadway/amqp/connection.go:377 +0x62
github.com/streadway/amqp.(*Connection).reader()
/opt/go/src/github.com/streadway/amqp/connection.go:471 +0x322
It appears that Connection.shutdown is accessing me.channels to close them (goroutine 80) without any kind of mutex locking, while concurrently getting a Connection.releaseChannel (via channel.Close) which accesses me.channels too (using the connection lock though).
After trying to fix this by protecting Connection.shutdown I noticed I started having another race condition due to Channel.call accessing me.send while Channel.shutdown changes it to sendClosed.
Seriously, I don't know if this the correct fix.
I wrote a simple test that causes at least one of these issues to happen, and enabled -race for travis builds.
There is another data race related to IO handling in shared_test.go or whatever.
I didn't understand exactly what the problem was, but it looked like a test code-only problem, and since it is out of the scope of this PR I leave it open for you to take a look first.
Please notice too that I had to remove the locking around Channel.call for Channel.Confirm because I applied the lock inside Channel.call while calling Channel.send. Therefore this PR also applies the fix proposed in #187 .
Additionally, since adding -race makes the build explode on travis due to other problems, I first fixed them on #197 and rebased that branch on this PR's branch.
I am sorry, it took me a while to figure it out there was a deadlock in the use of pipes with go1.1. The last commit should work around it.
Looks reasonable at first approximation, thank you.
Note that this PR also seems to include https://github.com/streadway/amqp/pull/197.
Please rebase this against master.
Note that this PR also seems to include #197.
Yes. I left a comment regarding that. Please check: https://github.com/streadway/amqp/pull/196#issuecomment-202242870
Just committed one of the changes you asked and the rebase against master.
Note that #197, when merged, broke CI in ways that I cannot reproduce in other environments.
Please note that travis is failing because it is detecting a data race in connection.go that was not fixed by #210 against which I handled the merged conflict by using that version (therefore removing my original code).
https://api.travis-ci.org/jobs/179778933/log.txt?deansi=true
My original code, I believe fixes this by protecting the whole shutdown. @DmitriyMV @michaelklishin Please let me know what you think before I try to commit it: https://github.com/imkira/amqp/blob/hotfix/connection-shutdown-race-orig/connection.go#L344
@imkira According to the Travis CI - it still doesn't pass tests. I manually applied this pull request and reverted #210. Still - 4 Data Races https://github.com/DmitriyMV/amqp/commits/pull/196 https://travis-ci.org/DmitriyMV/amqp/builds/179808513
Any advice?
@DmitriyMV OK, so I ended up removing all #210 code which was just a subset of the problems I addressed originally with this PR.
The "illusiveness" of the bugs in here are, in my opinion, a reflection of the lack of -race for a long time in this project resulting in a huge amount of untested code being committed to the package, tricky code with lack of goroutine synchronization and finalizers, and in general, IMHO, bad (too complex) synchronization practices present in this library :(
I added https://github.com/streadway/amqp/pull/196/commits/524f95a01ca0d2b1ebfd906a027ac2de6e5a0526 to try to fix some of these problems.
With this commit I am synchronizing Connection.reader() with Connection.Close and doing some other tricky things:
- I found places where the reader and the heartbeater relied on each other but if the heartbeater finished first (via NotifyClose), sometimes reader would be stuck trying to send the heartbeat to a full channel (me.sends).
- I also noticed
me.blockswere being used inConnection.dispatch0without any mutex, despite the fact that they are modifiable viaNofifyBlockedand accessed during destruction (actually closed, which is even worse). - I also force connections to be closed in tests so that when the test function finishes processing everything returns back to a clean state.
Please let me know what you think.
By the way, I am getting the following error sometimes on travis https://api.travis-ci.org/jobs/179929218/log.txt?deansi=true:
--- FAIL: TestRepeatedChannelExceptionWithPublishAndMaxProcsIssue46 (0.09s)
shared_test.go:68: issue46 close
integration_test.go:1411: expected error only on publish, got error on channel.open: Exception (503) Reason: "COMMAND_INVALID - second 'channel.open' seen"
Do you have any idea why this may be happening?
Sorry, no. I spent last two days, trying to figure out, why this problem occurs (at random times too). My current thought is that we get channelClose, but don't wait for channelCloseOk confirmation in
func (me *Channel) dispatch(msg message) {
switch m := msg.(type) {
case *channelClose:
me.connection.closeChannel(me, newError(m.ReplyCode, m.ReplyText))
me.send(me, &channelCloseOk{})
...
I'm not familiar with AMQP protocol tho. And because of that I'm left to guesses.
There is no channel.close-ok confirmation. There is channel.close which can be sent by both clients and server and there is channel.close-ok that confirms it.
@michaelklishin what if we are immediately trying to open channel after we had closed it, with the same channel id?
Can you describe a second 'channel.open' seen error a bit more, and what could be the source of it. My google-fu showed me numerous amounts of libraries who are dealing with this error from time to time, but the source of the problem is different too. Each time it's wrong usage in user code. No exact specifics tho.
EDIT: Clarification.
@DmitriyMV I thought the reusing of the channel could be a good explanation for what is happening, but when you look at TestRepeatedChannelExceptionWithPublishAndMaxProcsIssue46, the function only uses 1 connection, creates 100 channels and sends 10 messages with each serially, and doesn't close any of them. So, I think a bug in the allocator is out of question.
The error message received from the server mentions second 'channel.open' seen but, irrespective of that, and according to the reference https://www.rabbitmq.com/amqp-0-9-1-reference.html#constant.channel-error :
command-invalid
503 connection
The client sent an invalid sequence of frames, attempting to perform an operation that was considered invalid by the server. This usually implies a programming error in the client.
It implies a "invalid sequence of frames". What if we are not waiting enough time between creating a channel and publishing to it? I looked at the code but couldn't find a reason for it, though I found this case block maybe "too inclusive":
https://github.com/streadway/amqp/blob/1b8853833a94be8ba57fd121a8651f7221e5b051/channel.go#L298-L299
But the other tests do not fail, so either they don't generate enough Open/Close channel sequences or it has something to do with situation where channel got closed by the RabbitMQ due to error.
@imkira I'm open to ideas of how to create a test which will show if we actually send wrong sequence of frames, without provoking an error from server in the first place (like TestRepeatedChannelExceptionWithPublishAndMaxProcsIssue46 do).
Hey folks,
I'm posting this on behalf of the core team.
As you have noticed, this client hasn't seen a lot of activity recently. Many users are unhappy about that and we fully recognize that it's a popular library that should be maintained more actively. There are also many community members who have contributed pull requests and haven't been merged for various reasons.
Because this client has a long tradition of "no breaking public API changes", certain reasonable changes will likely never be accepted. This is frustrating to those who have put in their time and effort into trying to improve this library.
We would like to thank @streadway for developing this client and maintaining it for a decade — that's a remarkable contribution to the RabbitMQ ecosystem. We this now is a good time to get more contributors involved.
Team RabbitMQ has adopted a "hard fork" of this client in order to give the community a place to evolve the API. Several RabbitMQ core team members will participate but we think it very much should be a community-driven effort.
What do we mean by "hard fork" and what does it mean for you? The entire history of the project is retained in the new repository but it is not a GitHub fork by design. The license remains the same 2-clause BSD. The contribution process won't change much (except that we hope to review and accept PRs reasonably quickly).
What does change is that this new fork will accept reasonable breaking API changes according
to Semantic Versioning (or at least our understanding of it). At the moment the API is identical
to that of streadway/amqp but the package name is different. We will begin reviewing PRs
and merging them if they make sense in the upcoming weeks.
If your PR hasn't been accepted or reviewed, you are welcome to re-submit it for rabbitmq/amqp091-go.
RabbitMQ core team members will evaluate the PRs currently open for streadway/amqp as time allows,
and pull those that don't have any conflicts. We cannot promise that every PR would be accepted
but at least we are open to changing the API going forward.
Note that it is a high season for holidays in some parts of the world, so we may be slower to respond in the next few weeks but otherwise, we are eager to review as many currently open PRs as practically possible soon.
Thank you for using RabbitMQ and contributing to this client. On behalf of the RabbitMQ core team, @chunyilyu and @michaelklishin.