anyio
anyio copied to clipboard
MemoryObjectStream can drop items when the receiving end is cancelled
Things to check first
-
[X] I have searched the existing issues and didn't find my bug already reported there
-
[X] I have checked that my bug is still present in the latest release
AnyIO version
4.3.0
Python version
3.8
What happened?
If a task (A) sends an item to another task (B) via a memory object stream, and task B is in a state of waiting for an item, and has a pending cancellation, the item is still sent to B but as cancellation is then delivered to B, that item is essentially dropped on the floor.
A similar issue was reported in #146, but it seems that it wasn't fixed as thoroughly as I had hoped.
How can we reproduce the bug?
import anyio
from anyio import (
CancelScope, create_memory_object_stream, create_task_group,
wait_all_tasks_blocked
)
async def receiver(receive, task_status):
with CancelScope() as cancel_scope:
task_status.started(cancel_scope)
await receive.receive()
async def main():
send, receive = create_memory_object_stream(1)
with send, receive:
async with create_task_group() as tg:
cancel_scope = await tg.start(receiver, receive)
await wait_all_tasks_blocked()
cancel_scope.cancel()
send.send_nowait("item")
assert receive.receive_nowait() == "item"
anyio.run(main)
The above snippet reproduces the problem (WouldBlock
is raised) on both asyncio and Trio. On Trio, if create_memory_object_stream()
is replaced with trio.open_memory_channel()
, the snippet no longer raises an exception.
did you mean to include that assert
in your reproducer? I would expect assert receive.receive_nowait() == "item"
, if reachable, to raise WouldBlock
:
-
receive_nowait()
can only receive an item from a buffer or from a waiting sender, but in your reproducer there is neither: the stream is unbuffered and there are noawait send()
calls, onlysend_nowait()
calls. -
looking at history, there's a test relevant to this: I think that your reproducer's assertion is asserting the same thing that
test_cancel_during_receive_last_receiver
did.test_cancel_during_receive_last_receiver
was removed in a3af1da23c15af016af2656ebae79bd1580cbee3 because it was deemed incorrect. IIUC it was incorrect because closing an unbuffered memory object stream pair is supposed to be guaranteed to not drop items (there is no buffer, so there are no items to possibly drop). in the ~day between bd9a31063b4aa2f0e0848984651402724dd00f94 and a3af1da23c15af016af2656ebae79bd1580cbee3, AnyIO could push items beyond the buffer's limit, meaning that closing an unbuffered stream pair could incorrectly cause items to be dropped. but a3af1da23c15af016af2656ebae79bd1580cbee3 fixed this and removed the incorrect test.
if I remove that assertion from your reproducer, then I believe your reproducer demonstrates at least one bug, but possibly two:
-
the bug of AnyIO dropping the item in your reproducer.
-
the (possible) bug of
send_nowait()
raisingWouldBlock
when usingtrio_memory_channel.send_nowait()
but not raisingWouldBlock
when usinganyio_memory_stream.send_nowait()
.unlike (1), this does not cause items to be lost.
I think it's worth discussing these two things separately because (1) is the urgent issue (data loss), and it can be fixed first without doing the harder thing of also fixing (2). in more detail:
-
this was a regression that was introduced in #595. #595 as whole should not be totally reverted, as it fixed #536. but partially reverting it would fix (1) and prevent item loss: here's a PR for that: #729
-
it's not clear to me at the moment if this is a bug or not. regarding the
send_nowait()
andawait receive()
calls, I'd expect one of two things to happen. either of these options would be fine in the sense that neither of them can cause AnyIO to drop items:-
send_nowait()
returns success andawait receive.receive()
receives the item.this is what AnyIO did prior to #595, when (1) regressed.
currently this is almost what AnyIO is doing, except that currently
send_nowait()
can go through whileawait receive.receive()
raises cancelled, losing an item (i.e. (1)).this behavior can be explained as: while the
await receive()
call's scope gets a cancellation request beforesend_nowait()
, there's no checkpoint betweencancel_scope.cancel()
andsend_nowait()
, so thesend_nowait()
call will happen and will succeed beforeawait receive()
has a chance to react to the request for cancellation. at that point,receive()
has already received an item, so it's too late for it to raise cancelled (see also https://github.com/agronholm/anyio/pull/147#issuecomment-674615765). -
send_nowait()
raisesWouldBlock
andawait receive.receive()
raises cancelled.this is the behavior when replacing the AnyIO memory stream with a Trio memory channel. I don't know what Trio's design reason for this is (assuming this was explicitly intentional), but the technical reason is that Trio memory channels are not implemented using
Event
; they're implemented usingwait_task_rescheduled
. specifically, https://github.com/agronholm/anyio/issues/146#issuecomment-673754182.
question: is this behavior of Trio intentional/documented? i.e., does Trio document that (ii) will happen and (i) will not? or does Trio leave it undocumented, and it is just an implementation detail whether
open_memory_channel
is usingEvent
orwait_task_rescheduled
under the hood?in the case that this is a deemed to be a bug (i.e. if Trio does document this behavior and so AnyIO should have identical behavior to Trio's documented behavior), then i agree with your gitter message that
I think this could be solved if the sender could skip recipients that have been cancelled
doing that in a backend agnostic manner is the tough part though :)
I think that one backend-agnostic manner to do this could be to implement
anyio.lowlevel.wait_task_rescheduled
and have memory streams use that instead ofEvent
.also, in the case that this is deemed to be a bug, here's a test: e70b0e48e12aae8c6f120cfe18cd7c56c7ecccbb
-
(1) is the urgent problem (dropped items). since it's straightforward to fix, I think that it should be fixed first, and whether (2) is a bug or not (and, if so, how to resolve it in a backend-agnostic manner) can be figured out later (perhaps in a separate issue).
Ah, my intention was to create a memory object stream with a buffer. Then the assert
makes sense, yes?
I've adjusted the snippet accordingly.
Compare to the use of open_memory_channel()
:
import anyio
from anyio import (
CancelScope, create_task_group,
wait_all_tasks_blocked,
)
from trio import open_memory_channel
async def receiver(receive, task_status):
with CancelScope() as cancel_scope:
task_status.started(cancel_scope)
await receive.receive()
async def main():
send, receive = open_memory_channel(1)
with send, receive:
async with create_task_group() as tg:
cancel_scope = await tg.start(receiver, receive)
await wait_all_tasks_blocked()
cancel_scope.cancel()
send.send_nowait("item")
assert receive.receive_nowait() == "item"
anyio.run(main, backend="trio")
Ah, my intention was to create a memory object stream with a buffer. Then the
assert
makes sense, yes?
yes, agreed, assuming that your intention is to test for (2). but only (1) is needed to ensure that the stream isn't dropping items. (2) is a stricter criterion.
it's not clear to me whether (2) is actually a bug or problematic. what did you think about
question: is this behavior of Trio intentional/documented? i.e., does Trio document that (ii) will happen and (i) will not? or does Trio leave it undocumented, and it is just an implementation detail whether
open_memory_channel
is usingEvent
orwait_task_rescheduled
under the hood?
i have just written some more tests that are also closely related, plus a closely related fix. (these tests are currently in branch #729 (2b095852be116199529228e8ce078d01a14b2f20).) here is a summary of all of these variations of #146 that that branch is currently testing for (I think that this list makes a good summary of the relationships between these issues):
-
misbehavior when
MemoryObjectReceiveStream.receive()
is cancelled. the misbehavior is in the form of dropped items, i.e. neither the attempted sender nor the attempted receiver think they are responsible for the item.this is #146. it has two cases:
-
test_cancel_during_receive_after_send_nowait
: this has been passing since 2020 (although it no longer supports native cancellations since #595, as mentioned in #595 (comment)). -
test_cancel_during_receive_before_send_nowait
: this is #728 (sense (1)). it had been passing since 2020 but started failing in #595.
-
-
misbehavior when
MemoryObjectSendStream.send()
is cancelled. the misbehavior is in the form of simultaneously-sent-and-not-sent items, i.e. both the attempted sender and the attempted receiver think they are responsible for the item.this issue was mentioned in #536:
MemoryObjectSendStream.send(item)
can raise cancelled afteritem
has been delivered to a receiver! thisMemoryObjectSendStream.send
issue is just thesend_event.wait()
case to #146'sreceive_event.wait()
case, i believethis also has two cases. they are analogous to #146's cases:
-
misbehavior when
Event.wait()
is cancelled: there are also two cases here:-
test_event_wait_before_set_before_cancel
: this was #536; it was fixed by #595 (although it remains broken with native cancellations). -
test_event_wait_before_cancel_before_set
: this has been behaving correctly for a while.this is closely related to #728 (sense (2)), because this
Event
behavior is the reason why the regression #728 (sense (2)) happened. specifically, thisEvent
behavior is why it's not possible to implement Trio's memory channel cancellation semantics (the behavior of itsabort_fn
s) by using purely the cancellation semantics ofEvent
.
-