asyncio.Queue: putting items out of order when it is full
| BPO | 38874 |
|---|---|
| Nosy | @asvetlov, @1st1, @MojoVampire, @rhdxmr |
| Files |
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields:
assignee = None
closed_at = None
created_at = <Date 2019-11-21.03:25:54.744>
labels = ['3.7', 'expert-asyncio']
title = 'asyncio.Queue: putting items out of order when it is full'
updated_at = <Date 2019-11-27.03:57:51.152>
user = 'https://github.com/rhdxmr'
bugs.python.org fields:
activity = <Date 2019-11-27.03:57:51.152>
actor = 'josh.r'
assignee = 'none'
closed = False
closed_date = None
closer = None
components = ['asyncio']
creation = <Date 2019-11-21.03:25:54.744>
creator = 'esrse'
dependencies = []
files = ['48740']
hgrepos = []
issue_num = 38874
keywords = []
message_count = 6.0
messages = ['357129', '357130', '357132', '357166', '357247', '357553']
nosy_count = 4.0
nosy_names = ['asvetlov', 'yselivanov', 'josh.r', 'esrse']
pr_nums = []
priority = 'normal'
resolution = None
stage = None
status = 'open'
superseder = None
type = None
url = 'https://bugs.python.org/issue38874'
versions = ['Python 3.7']
Hi
The python document says that asyncio.Queue is FIFO queue. But it is not true when the queue is full and multi tasks are putting items into it simultaneously. I know the document does not explicitly mention that asyncio.Queue is multi producer queue, but also there does not exist any note that it may be corrupt if used by multi producers.
I found the below scenario happens when the asyncio.Queue is full and multi tasks are putting into it simultaneously. Let me explain this.
When a queue is full, putters are awaiting here: https://github.com/python/cpython/blob/d67279147ace3b63187e5d75a15c345264f39e85/Lib/asyncio/queues.py#L125
And the first of them is waken up here: https://github.com/python/cpython/blob/d67279147ace3b63187e5d75a15c345264f39e85/Lib/asyncio/queues.py#L188
Before the first putter hands over the control of execution, new putter calls the .put() method so the queue is not full at this point that it calls .put_nowait() immediately here: https://github.com/python/cpython/blob/d67279147ace3b63187e5d75a15c345264f39e85/Lib/asyncio/queues.py#L140
If this happens the new putter puts earlier than other awaiting putters.
I hope the queue gets fixed to work correctly in this case.
Thanks,
Junyeong
The items that haven't finished the put aren't actually "in" the queue yet, so I don't see how non-FIFO order of insertion violates any FIFO guarantees for the contents of the queue; until the items are actually "in", they're not sequenced for the purposes of when they come "out". Mandating such a guarantee effectively means orchestrating a queue with a real maxsize equal to the configured maxsize plus the total number of coroutines competing to put items into it.
The guarantee is still being met here; once an item is put, it will be "get"-ed after anything that finished put-ing before it, and before anything that finished put-ing after it.
The items that haven't finished the put aren't actually "in" the queue yet, so I don't see how non-FIFO order of insertion violates any FIFO guarantees for the contents of the queue; until the items are actually "in", they're not sequenced for the purposes of when they come "out". Mandating such a guarantee effectively means orchestrating a queue with a real maxsize equal to the configured maxsize plus the total number of coroutines competing to put items into it.
Your explanation is correct technically. But without enough explanation, this behavior can confuse many users.
For instance it happens to put data into asyncio.Queue at asyncio protocol callbacks by creating new task. This logic will work nicely for a long time but in the corner case the queue gets full and at the point of view of a consumer the data come out from the queue are out of order.
IMHO, it would be better to add some note into the document, describing what FIFO means in regard to asyncio.Queue.
I still don't understand the problem.
If the queue is full new items still are added in the order of await q.put() calls.
If there are multiple producers the order of adding items into the queue is still the order of q.put().
Do you have a code example that demonstrates the opposite behavior?
Thanks for having an interest in this issue.
I thought asyncio.Queue guarantees the order of items as .put called after I read the code. But it does not.
I attach poc code here.
In my machine, this program prints this message and exits.
$ python poc-oooput.py
num=1, last_read_num=0
num=33, last_read_num=1
Traceback (most recent call last):
File "poc-oooput.py", line 47, in <module>
asyncio.run(main())
File "/home/esrse/.pyenv/versions/3.7.3/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/home/esrse/.pyenv/versions/3.7.3/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "poc-oooput.py", line 44, in main
await stream.read()
File "poc-oooput.py", line 20, in read
assert num == self._last_read_num + 1
AssertionError
This shows you that the 33rd item is put before the 2nd item.
Yes, five outstanding blocked puts can be bypassed by a put that comes in immediately after a get creates space. But this isn't really a problem; there are no guarantees on what order puts are executed in, only a guarantee that once a put succeeds, it's FIFO ordered with respect to all other puts.
Nothing in the docs even implies the behavior you're expecting, so I'm not seeing how even a documentation fix is warranted here. The docs on put clearly say:
"Put an item into the queue. If the queue is full, wait until a free slot is available before adding the item."
If we forcibly hand off on put even when a slot is available (to allow older puts to finish first), then we violate the expectation that waiting is only performed when the queue is full (if I test myqueue.full() and it returns False, I can reasonably expect that put won't block). This would be especially impossible to fix if people write code like if not myqueue.full(): myqueue.put_nowait(). put_nowait isn't even a coroutine, so it can't hand off control to the event loop to allow waiting puts to complete, even if it wanted to, and it can't fail to put (e.g. by determining the empty slots will be filled by outstanding puts in some relatively expensive way), because you literally just verified the queue wasn't full and had no awaits between the test and the put_nowait, so it must succeed.
In short: Yes, it's somewhat unpleasant that a queue slot can become free and someone else can swoop in and steal it before older waiting puts can finish. But any change that "fixed" that would make all code slower (forcing unnecessary coroutine switches), and violate existing documentation guarantees.
But any change that "fixed" that would make all code slower (forcing unnecessary coroutine switches), and violate existing documentation guarantees.
This issue should be closed then, right?
Hmm… We managed to fix this for semaphores, which are documented to be fair. Queues aren’t. But maybe the OP is expecting fairness? @rhdxmr
But any change that "fixed" that would make all code slower (forcing unnecessary coroutine switches), and violate existing documentation guarantees
I don't really agree with the first statement above because the main problem is how to handle waiting and moving tasks with the Queue._putters deque.
Here is an a very simple example to point out this use case:
async def simplier_put_case():
q = asyncio.Queue(1)
for n in range(3):
asyncio.create_task(q.put(n:=n+1))
print(f"PUT: {n}")
await asyncio.sleep(0.0)
# queue full: 1 item in queue, 2 items waiting into the _putters
print(f'{str(q) = }/{q.full() = }')
print(f'GET: {await q.get()}')
# queue not full: 1 item out of queue, 1 item waiting in the _putters, the last 1 out of _putters moving into the queue
print(f'{str(q) = }/{q.full() = }')
# steal the free solt
print(f"PUT_NOWAIT: {q.put_nowait(n:=n+1) or n}")
print(f'{str(q) = }/{q.full() = }')
# get all items
while (n:=n-1):
print(f'GET: {await q.get()}')
print(f'END {(str(q)) = }/{q.full() = }')
asyncio.run(simplier_put_case())
Ouput is:
PUT: 1
PUT: 2
PUT: 3
str(q) = '<Queue maxsize=1 _queue=[1] _putters[2] tasks=1>'/q.full() = True
GET: 1
str(q) = '<Queue maxsize=1 _putters[1] tasks=1>'/q.full() = False
PUT_NOWAIT: 4
str(q) = '<Queue maxsize=1 _queue=[4] _putters[1] tasks=2>'/q.full() = True
GET: 4
GET: 2
GET: 3
END (str(q)) = '<Queue maxsize=1 tasks=4>'/q.full() = False
To avoid stealing a free slot, when entering in Queue.put or Queue.put_nowait, moreover to check that queue is not full, we have also to check that Queue._putters is not empty. This could indicate that there are pending task(s) or at least one item moving to a free slot.
To make that possible, we have to modify the Queue._wakeup_next method. I suggest to leave the first 'not done' future in the Queue._putters deque, and remove it only in the Queue.put method, just after the await putter.
Having said that, the fix seems simple.
- preserve a 'Queue._putters' containing all pending or moving items (via their attached future). We have to move the removing of a future from
_wakeup_nexttoQueue.putmethod. - prevent stealing a empty slot
- in the
Queue.put_nowaitmethod, raising a new exception asQFullWithPendingOrMovingItemswhenQueue._puttersis not empty, - in the
Queue.putmethod, adding a new item in the queue only when the queue is not full andQueue._puttersis empty. otherwise append it intoQueue._puttersdeque.
- in the
Finally, I am wondering if it should simplier to handle pending items with a Semaphore than a deque.
@YvesDup Can you phrase that suggestion in the form of a PR? I really don't want to try and interpret your four bullet points into working code plus thorough tests. You seem to have thought about this a fair deal so I expect you have everything readily available in your mind to write that PR.
@gvanrossum PR is ready. Please could you take a look ? Thank for your comments.
I am probably going to slow roll the PR, sorry. It's not that I am really against the idea of making Queue fair, it's that the change is subtle and fairly involved (witness the number of tests added!), and there is not consensus here that there is even a problem. I realize I asked for a PR, but that was in part to get you to quantify your claim "the fix seems simple". It doesn't seem simple to me -- in fact, in my experience, nothing in the area of synchronization primitives is ever simple (count how many times we had to redo Lock and Semaphore), and the size of your PR proves it.
Maybe some other core dev cares more about this issue, but I need to limit the things I work on to a reasonable amount otherwise I wouldn't be able to sleep (I've got chronic insomnia already). Sorry!
I understand your point of view, which seems very reasonable to me. This is not easy at all, you are right. And above all I wouldn't want these issue and PR to contribute to our insomnia. For my part, I will push a updated version which will be my last contribution on this issue. Happy end of year !!
After see it the complexity of the solution I think we should not try to make the Queue class “fair” in this sense.