anyio icon indicating copy to clipboard operation
anyio copied to clipboard

MemoryObjectStream can drop items when the receiving end is cancelled

Open agronholm opened this issue 9 months ago • 7 comments

Things to check first

  • [X] I have searched the existing issues and didn't find my bug already reported there

  • [X] I have checked that my bug is still present in the latest release

AnyIO version

4.3.0

Python version

3.8

What happened?

If a task (A) sends an item to another task (B) via a memory object stream, and task B is in a state of waiting for an item, and has a pending cancellation, the item is still sent to B but as cancellation is then delivered to B, that item is essentially dropped on the floor.

A similar issue was reported in #146, but it seems that it wasn't fixed as thoroughly as I had hoped.

How can we reproduce the bug?

import anyio
from anyio import (
    CancelScope, create_memory_object_stream, create_task_group,
    wait_all_tasks_blocked
)

async def receiver(receive, task_status):
    with CancelScope() as cancel_scope:
        task_status.started(cancel_scope)
        await receive.receive()

async def main():
    send, receive = create_memory_object_stream(1)
    with send, receive:
        async with create_task_group() as tg:
            cancel_scope = await tg.start(receiver, receive)
            await wait_all_tasks_blocked()
            cancel_scope.cancel()
            send.send_nowait("item")

        assert receive.receive_nowait() == "item"

anyio.run(main)

The above snippet reproduces the problem (WouldBlock is raised) on both asyncio and Trio. On Trio, if create_memory_object_stream() is replaced with trio.open_memory_channel(), the snippet no longer raises an exception.

agronholm avatar May 09 '24 13:05 agronholm

did you mean to include that assert in your reproducer? I would expect assert receive.receive_nowait() == "item", if reachable, to raise WouldBlock:

  • receive_nowait() can only receive an item from a buffer or from a waiting sender, but in your reproducer there is neither: the stream is unbuffered and there are no await send() calls, only send_nowait() calls.

  • looking at history, there's a test relevant to this: I think that your reproducer's assertion is asserting the same thing that test_cancel_during_receive_last_receiver did. test_cancel_during_receive_last_receiver was removed in a3af1da23c15af016af2656ebae79bd1580cbee3 because it was deemed incorrect. IIUC it was incorrect because closing an unbuffered memory object stream pair is supposed to be guaranteed to not drop items (there is no buffer, so there are no items to possibly drop). in the ~day between bd9a31063b4aa2f0e0848984651402724dd00f94 and a3af1da23c15af016af2656ebae79bd1580cbee3, AnyIO could push items beyond the buffer's limit, meaning that closing an unbuffered stream pair could incorrectly cause items to be dropped. but a3af1da23c15af016af2656ebae79bd1580cbee3 fixed this and removed the incorrect test.

gschaffner avatar May 11 '24 10:05 gschaffner

if I remove that assertion from your reproducer, then I believe your reproducer demonstrates at least one bug, but possibly two:

  1. the bug of AnyIO dropping the item in your reproducer.

  2. the (possible) bug of send_nowait() raising WouldBlock when using trio_memory_channel.send_nowait() but not raising WouldBlock when using anyio_memory_stream.send_nowait().

    unlike (1), this does not cause items to be lost.

I think it's worth discussing these two things separately because (1) is the urgent issue (data loss), and it can be fixed first without doing the harder thing of also fixing (2). in more detail:

  1. this was a regression that was introduced in #595. #595 as whole should not be totally reverted, as it fixed #536. but partially reverting it would fix (1) and prevent item loss: here's a PR for that: #729

  2. it's not clear to me at the moment if this is a bug or not. regarding the send_nowait() and await receive() calls, I'd expect one of two things to happen. either of these options would be fine in the sense that neither of them can cause AnyIO to drop items:

    1. send_nowait() returns success and await receive.receive() receives the item.

      this is what AnyIO did prior to #595, when (1) regressed.

      currently this is almost what AnyIO is doing, except that currently send_nowait() can go through while await receive.receive() raises cancelled, losing an item (i.e. (1)).

      this behavior can be explained as: while the await receive() call's scope gets a cancellation request before send_nowait(), there's no checkpoint between cancel_scope.cancel() and send_nowait(), so the send_nowait() call will happen and will succeed before await receive() has a chance to react to the request for cancellation. at that point, receive() has already received an item, so it's too late for it to raise cancelled (see also https://github.com/agronholm/anyio/pull/147#issuecomment-674615765).

    2. send_nowait() raises WouldBlock and await receive.receive() raises cancelled.

      this is the behavior when replacing the AnyIO memory stream with a Trio memory channel. I don't know what Trio's design reason for this is (assuming this was explicitly intentional), but the technical reason is that Trio memory channels are not implemented using Event; they're implemented using wait_task_rescheduled. specifically, https://github.com/agronholm/anyio/issues/146#issuecomment-673754182.

    question: is this behavior of Trio intentional/documented? i.e., does Trio document that (ii) will happen and (i) will not? or does Trio leave it undocumented, and it is just an implementation detail whether open_memory_channel is using Event or wait_task_rescheduled under the hood?

    in the case that this is a deemed to be a bug (i.e. if Trio does document this behavior and so AnyIO should have identical behavior to Trio's documented behavior), then i agree with your gitter message that

    I think this could be solved if the sender could skip recipients that have been cancelled

    doing that in a backend agnostic manner is the tough part though :)

    I think that one backend-agnostic manner to do this could be to implement anyio.lowlevel.wait_task_rescheduled and have memory streams use that instead of Event.

    also, in the case that this is deemed to be a bug, here's a test: e70b0e48e12aae8c6f120cfe18cd7c56c7ecccbb

(1) is the urgent problem (dropped items). since it's straightforward to fix, I think that it should be fixed first, and whether (2) is a bug or not (and, if so, how to resolve it in a backend-agnostic manner) can be figured out later (perhaps in a separate issue).

gschaffner avatar May 11 '24 10:05 gschaffner

Ah, my intention was to create a memory object stream with a buffer. Then the assert makes sense, yes?

agronholm avatar May 11 '24 19:05 agronholm

I've adjusted the snippet accordingly.

agronholm avatar May 11 '24 20:05 agronholm

Compare to the use of open_memory_channel():

import anyio
from anyio import (
    CancelScope, create_task_group,
    wait_all_tasks_blocked,
)
from trio import open_memory_channel

async def receiver(receive, task_status):
    with CancelScope() as cancel_scope:
        task_status.started(cancel_scope)
        await receive.receive()

async def main():
    send, receive = open_memory_channel(1)
    with send, receive:
        async with create_task_group() as tg:
            cancel_scope = await tg.start(receiver, receive)
            await wait_all_tasks_blocked()
            cancel_scope.cancel()
            send.send_nowait("item")

        assert receive.receive_nowait() == "item"

anyio.run(main, backend="trio")

agronholm avatar May 11 '24 20:05 agronholm

Ah, my intention was to create a memory object stream with a buffer. Then the assert makes sense, yes?

yes, agreed, assuming that your intention is to test for (2). but only (1) is needed to ensure that the stream isn't dropping items. (2) is a stricter criterion.

it's not clear to me whether (2) is actually a bug or problematic. what did you think about

question: is this behavior of Trio intentional/documented? i.e., does Trio document that (ii) will happen and (i) will not? or does Trio leave it undocumented, and it is just an implementation detail whether open_memory_channel is using Event or wait_task_rescheduled under the hood?

gschaffner avatar May 12 '24 06:05 gschaffner

i have just written some more tests that are also closely related, plus a closely related fix. (these tests are currently in branch #729 (2b095852be116199529228e8ce078d01a14b2f20).) here is a summary of all of these variations of #146 that that branch is currently testing for (I think that this list makes a good summary of the relationships between these issues):

  • misbehavior when MemoryObjectReceiveStream.receive() is cancelled. the misbehavior is in the form of dropped items, i.e. neither the attempted sender nor the attempted receiver think they are responsible for the item.

    this is #146. it has two cases:

    • test_cancel_during_receive_after_send_nowait: this has been passing since 2020 (although it no longer supports native cancellations since #595, as mentioned in #595 (comment)).

    • test_cancel_during_receive_before_send_nowait: this is #728 (sense (1)). it had been passing since 2020 but started failing in #595.

  • misbehavior when MemoryObjectSendStream.send() is cancelled. the misbehavior is in the form of simultaneously-sent-and-not-sent items, i.e. both the attempted sender and the attempted receiver think they are responsible for the item.

    this issue was mentioned in #536:

    MemoryObjectSendStream.send(item) can raise cancelled after item has been delivered to a receiver! this MemoryObjectSendStream.send issue is just the send_event.wait() case to #146's receive_event.wait() case, i believe

    this also has two cases. they are analogous to #146's cases:

    • test_cancel_during_send_after_receive_nowait: this has been passing since #595.

    • test_cancel_during_send_before_receive_nowait: this is analogous to #728 (sense (1)). it is failing on master.

  • misbehavior when Event.wait() is cancelled: there are also two cases here:

    • test_event_wait_before_set_before_cancel: this was #536; it was fixed by #595 (although it remains broken with native cancellations).

    • test_event_wait_before_cancel_before_set: this has been behaving correctly for a while.

      this is closely related to #728 (sense (2)), because this Event behavior is the reason why the regression #728 (sense (2)) happened. specifically, this Event behavior is why it's not possible to implement Trio's memory channel cancellation semantics (the behavior of its abort_fns) by using purely the cancellation semantics of Event.

gschaffner avatar May 12 '24 06:05 gschaffner