aioamqp
aioamqp copied to clipboard
Channel publish requires a lock
Steps to reproduce:
- Open a channel
- Call channel.basic_publish(...) or channel.publish() 100 times simultaneously
My naive expected results: 100 messages get written to the server, one after another. Actual results: messages are interwoven, leading to undefined behavior (and undelivered messages).
The problem: each AMQP message consists of 3+ frames. (It can be more than 3 if the message is so large it needs to be split up.) After sending frame 1 for message A, frames 2+3 for message A must be written before frame 1 for message B is written. This is kinda intuitive on synchronous libraries (e.g., Java docs say "one channel per thread"); but it isn't intuitive in asyncio-land.
In a nutshell: this didn't feel intuitive.
I have three ideas for making aioamqp easier to work with:
- Add docstrings to channel.publish and channel.basic_publish, explaining that they can't be used asynchronously.
- Create an asyncio.Lock "send mutex" per channel, so concurrent callers can call publish() and basic_publish() (and all other methods) without interleaving frames. I haven't thought through whether there are negative consequences to this.
- Add assertions instead of an asyncio.Lock: if users end up calling publish() or basic_publish() within another publish, raise a RuntimeError with instructions on how to fix the calling code.
This relates to #145. (I don't think #145 should be merged, though, because writes should be async so buffers don't overflow. That's not the case today, which is fine for most people; but I don't think the API should "lock in" buffer overflows.)
My vote is to implement both 1 and 3. I'm willing to do this, if there's agreement from maintainers?