cpython icon indicating copy to clipboard operation
cpython copied to clipboard

asyncio.Queue: putting items out of order when it is full

Open 795e4d41-a79c-41f9-90e2-743bead692c4 opened this issue 6 years ago • 6 comments

BPO 38874
Nosy @asvetlov, @1st1, @MojoVampire, @rhdxmr
Files
  • poc-oooput.py
  • Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.

    Show more details

    GitHub fields:

    assignee = None
    closed_at = None
    created_at = <Date 2019-11-21.03:25:54.744>
    labels = ['3.7', 'expert-asyncio']
    title = 'asyncio.Queue: putting items out of order when it is full'
    updated_at = <Date 2019-11-27.03:57:51.152>
    user = 'https://github.com/rhdxmr'
    

    bugs.python.org fields:

    activity = <Date 2019-11-27.03:57:51.152>
    actor = 'josh.r'
    assignee = 'none'
    closed = False
    closed_date = None
    closer = None
    components = ['asyncio']
    creation = <Date 2019-11-21.03:25:54.744>
    creator = 'esrse'
    dependencies = []
    files = ['48740']
    hgrepos = []
    issue_num = 38874
    keywords = []
    message_count = 6.0
    messages = ['357129', '357130', '357132', '357166', '357247', '357553']
    nosy_count = 4.0
    nosy_names = ['asvetlov', 'yselivanov', 'josh.r', 'esrse']
    pr_nums = []
    priority = 'normal'
    resolution = None
    stage = None
    status = 'open'
    superseder = None
    type = None
    url = 'https://bugs.python.org/issue38874'
    versions = ['Python 3.7']
    

    Hi

    The python document says that asyncio.Queue is FIFO queue. But it is not true when the queue is full and multi tasks are putting items into it simultaneously. I know the document does not explicitly mention that asyncio.Queue is multi producer queue, but also there does not exist any note that it may be corrupt if used by multi producers.

    I found the below scenario happens when the asyncio.Queue is full and multi tasks are putting into it simultaneously. Let me explain this.

    When a queue is full, putters are awaiting here: https://github.com/python/cpython/blob/d67279147ace3b63187e5d75a15c345264f39e85/Lib/asyncio/queues.py#L125

    And the first of them is waken up here: https://github.com/python/cpython/blob/d67279147ace3b63187e5d75a15c345264f39e85/Lib/asyncio/queues.py#L188

    Before the first putter hands over the control of execution, new putter calls the .put() method so the queue is not full at this point that it calls .put_nowait() immediately here: https://github.com/python/cpython/blob/d67279147ace3b63187e5d75a15c345264f39e85/Lib/asyncio/queues.py#L140

    If this happens the new putter puts earlier than other awaiting putters.

    I hope the queue gets fixed to work correctly in this case.

    Thanks,

    Junyeong

    The items that haven't finished the put aren't actually "in" the queue yet, so I don't see how non-FIFO order of insertion violates any FIFO guarantees for the contents of the queue; until the items are actually "in", they're not sequenced for the purposes of when they come "out". Mandating such a guarantee effectively means orchestrating a queue with a real maxsize equal to the configured maxsize plus the total number of coroutines competing to put items into it.

    The guarantee is still being met here; once an item is put, it will be "get"-ed after anything that finished put-ing before it, and before anything that finished put-ing after it.

    The items that haven't finished the put aren't actually "in" the queue yet, so I don't see how non-FIFO order of insertion violates any FIFO guarantees for the contents of the queue; until the items are actually "in", they're not sequenced for the purposes of when they come "out". Mandating such a guarantee effectively means orchestrating a queue with a real maxsize equal to the configured maxsize plus the total number of coroutines competing to put items into it.

    Your explanation is correct technically. But without enough explanation, this behavior can confuse many users.

    For instance it happens to put data into asyncio.Queue at asyncio protocol callbacks by creating new task. This logic will work nicely for a long time but in the corner case the queue gets full and at the point of view of a consumer the data come out from the queue are out of order.

    IMHO, it would be better to add some note into the document, describing what FIFO means in regard to asyncio.Queue.

    I still don't understand the problem. If the queue is full new items still are added in the order of await q.put() calls. If there are multiple producers the order of adding items into the queue is still the order of q.put().

    Do you have a code example that demonstrates the opposite behavior?

    asvetlov avatar Nov 21 '19 13:11 asvetlov

    Thanks for having an interest in this issue.

    I thought asyncio.Queue guarantees the order of items as .put called after I read the code. But it does not.

    I attach poc code here.
    In my machine, this program prints this message and exits.
    $ python poc-oooput.py
    num=1, last_read_num=0
    num=33, last_read_num=1
    Traceback (most recent call last):
      File "poc-oooput.py", line 47, in <module>
        asyncio.run(main())
      File "/home/esrse/.pyenv/versions/3.7.3/lib/python3.7/asyncio/runners.py", line 43, in run
        return loop.run_until_complete(main)
      File "/home/esrse/.pyenv/versions/3.7.3/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
        return future.result()
      File "poc-oooput.py", line 44, in main
        await stream.read()
      File "poc-oooput.py", line 20, in read
        assert num == self._last_read_num + 1
    AssertionError
    

    This shows you that the 33rd item is put before the 2nd item.

    Yes, five outstanding blocked puts can be bypassed by a put that comes in immediately after a get creates space. But this isn't really a problem; there are no guarantees on what order puts are executed in, only a guarantee that once a put succeeds, it's FIFO ordered with respect to all other puts.

    Nothing in the docs even implies the behavior you're expecting, so I'm not seeing how even a documentation fix is warranted here. The docs on put clearly say:

    "Put an item into the queue. If the queue is full, wait until a free slot is available before adding the item."

    If we forcibly hand off on put even when a slot is available (to allow older puts to finish first), then we violate the expectation that waiting is only performed when the queue is full (if I test myqueue.full() and it returns False, I can reasonably expect that put won't block). This would be especially impossible to fix if people write code like if not myqueue.full(): myqueue.put_nowait(). put_nowait isn't even a coroutine, so it can't hand off control to the event loop to allow waiting puts to complete, even if it wanted to, and it can't fail to put (e.g. by determining the empty slots will be filled by outstanding puts in some relatively expensive way), because you literally just verified the queue wasn't full and had no awaits between the test and the put_nowait, so it must succeed.

    In short: Yes, it's somewhat unpleasant that a queue slot can become free and someone else can swoop in and steal it before older waiting puts can finish. But any change that "fixed" that would make all code slower (forcing unnecessary coroutine switches), and violate existing documentation guarantees.

    But any change that "fixed" that would make all code slower (forcing unnecessary coroutine switches), and violate existing documentation guarantees.

    This issue should be closed then, right?

    nmuise-barclay avatar Nov 18 '22 19:11 nmuise-barclay

    Hmm… We managed to fix this for semaphores, which are documented to be fair. Queues aren’t. But maybe the OP is expecting fairness? @rhdxmr

    gvanrossum avatar Nov 18 '22 19:11 gvanrossum

    But any change that "fixed" that would make all code slower (forcing unnecessary coroutine switches), and violate existing documentation guarantees

    I don't really agree with the first statement above because the main problem is how to handle waiting and moving tasks with the Queue._putters deque.

    Here is an a very simple example to point out this use case:

    async def simplier_put_case():
        q = asyncio.Queue(1)
        for n in range(3):
            asyncio.create_task(q.put(n:=n+1))
            print(f"PUT: {n}")
        await asyncio.sleep(0.0)
    
        # queue full: 1 item in queue, 2 items waiting into the _putters
        print(f'{str(q) = }/{q.full() = }')
        print(f'GET: {await q.get()}')
        # queue not full: 1 item out of queue, 1 item waiting in the _putters, the last 1 out of _putters moving into the queue
        print(f'{str(q) = }/{q.full() = }')
    
        # steal the free solt
        print(f"PUT_NOWAIT: {q.put_nowait(n:=n+1) or n}")
        print(f'{str(q) = }/{q.full() = }')
    
        # get all items
        while (n:=n-1):
            print(f'GET: {await q.get()}')
        print(f'END {(str(q)) = }/{q.full() = }')
    
    asyncio.run(simplier_put_case())
    

    Ouput is:

    PUT: 1
    PUT: 2
    PUT: 3
    str(q) = '<Queue maxsize=1 _queue=[1] _putters[2] tasks=1>'/q.full() = True
    GET: 1
    str(q) = '<Queue maxsize=1 _putters[1] tasks=1>'/q.full() = False
    PUT_NOWAIT: 4
    str(q) = '<Queue maxsize=1 _queue=[4] _putters[1] tasks=2>'/q.full() = True
    GET: 4
    GET: 2
    GET: 3
    END (str(q)) = '<Queue maxsize=1 tasks=4>'/q.full() = False
    

    To avoid stealing a free slot, when entering in Queue.put or Queue.put_nowait, moreover to check that queue is not full, we have also to check that Queue._putters is not empty. This could indicate that there are pending task(s) or at least one item moving to a free slot. To make that possible, we have to modify the Queue._wakeup_next method. I suggest to leave the first 'not done' future in the Queue._putters deque, and remove it only in the Queue.put method, just after the await putter.

    Having said that, the fix seems simple.

    • preserve a 'Queue._putters' containing all pending or moving items (via their attached future). We have to move the removing of a future from _wakeup_next to Queue.put method.
    • prevent stealing a empty slot
      • in the Queue.put_nowait method, raising a new exception as QFullWithPendingOrMovingItems when Queue._putters is not empty,
      • in the Queue.put method, adding a new item in the queue only when the queue is not full and Queue._putters is empty. otherwise append it into Queue._putters deque.

    Finally, I am wondering if it should simplier to handle pending items with a Semaphore than a deque.

    YvesDup avatar Nov 20 '23 17:11 YvesDup

    @YvesDup Can you phrase that suggestion in the form of a PR? I really don't want to try and interpret your four bullet points into working code plus thorough tests. You seem to have thought about this a fair deal so I expect you have everything readily available in your mind to write that PR.

    gvanrossum avatar Nov 20 '23 21:11 gvanrossum

    @gvanrossum PR is ready. Please could you take a look ? Thank for your comments.

    YvesDup avatar Dec 20 '23 14:12 YvesDup

    I am probably going to slow roll the PR, sorry. It's not that I am really against the idea of making Queue fair, it's that the change is subtle and fairly involved (witness the number of tests added!), and there is not consensus here that there is even a problem. I realize I asked for a PR, but that was in part to get you to quantify your claim "the fix seems simple". It doesn't seem simple to me -- in fact, in my experience, nothing in the area of synchronization primitives is ever simple (count how many times we had to redo Lock and Semaphore), and the size of your PR proves it.

    Maybe some other core dev cares more about this issue, but I need to limit the things I work on to a reasonable amount otherwise I wouldn't be able to sleep (I've got chronic insomnia already). Sorry!

    gvanrossum avatar Dec 24 '23 02:12 gvanrossum

    I understand your point of view, which seems very reasonable to me. This is not easy at all, you are right. And above all I wouldn't want these issue and PR to contribute to our insomnia. For my part, I will push a updated version which will be my last contribution on this issue. Happy end of year !!

    YvesDup avatar Dec 24 '23 17:12 YvesDup

    After see it the complexity of the solution I think we should not try to make the Queue class “fair” in this sense.

    gvanrossum avatar Jan 24 '24 17:01 gvanrossum