cpython
cpython copied to clipboard
Add `asyncio.Queue.__aiter__`
Feature or enhancement
Proposal:
Over the last few years, Trio and AnyIO users have proven out several design patterns using channels as async iterables. For example, having a context manager yield an async iterable avoids the motivating problems of both PEP-533 and PEP-789.
An asyncio.Queue is almost identical to a channel-pair, especially with the .shutdown() method added in Python 3.13. I therefore propose that we add an .__aiter__ method, to more support such design patterns without subclassing or a generator helper function, with an implementation as described in https://github.com/python/cpython/issues/119154#issuecomment-2120692346
Links to previous discussion of this feature:
https://github.com/python/peps/pull/3782#pullrequestreview-2059307313 suggested that queue.Queue could also be iterable. If we're extending this to synchronous classes I'd also include multiprocessing.Queue and multiprocessing.SimpleQueue. I'd omit multiprocessing.connection.Connection, due to the byte-level send/recv methods, and queue.SimpleQueue because without a .close() or .shutdown() method there's no clean way to shut down.
Limitations
Making Queue aiterable reaches API parity for single-producer, single-consumer patterns. In multi-producer and/or multi-consumer patterns, without a .clone() method it is the user's responsibility to shut down the queue when the last task is done. I do not propose to add .clone(), but we could include that link in the docs as an option for multi-producer patterns if desired.
It should be:
async def __aiter__(self):
try:
while True:
yield await self.get()
except asyncio.QueueShutDown:
return
Or it might be even better to implement __anext__ and do
def __aiter__(self):
return self
Probably should also have a task_done call.
I've included those suggestions in the top comment, sticking with __aiter__ because I don't see a clean way to support task_done() from __anext__ - and it feels more reasonable to describe queues as aiterables than aiterators.
I suspect the real problem is that the scope of the try/except is too large, and should only go around the await queue.get() part? (And you need s/queue/self/.)
Maybe this?
async def __aiter__(self):
while True:
try:
item = await self.get()
except asyncio.QueueShutDown:
return
yield item
self.task_done()
A philosophical question is, if yield item raises (i.e., the caller throws an exception into the generator), should task_done() be called or not?
Also, does it look like you're proposing a legit use case for async generators?
Also, does it look like you're proposing a legit use case for async generators?
Yes! Generators (sync or async) are a really elegant syntax for defining iterables, and the problems motivating PEP-789 only occur if you yield (and thus suspend execution) within certain context managers. If you don't have a context manager in the generator function, and you're careful about finalization timing for PEP-533 reasons, I think they're great.
In general, you should always close an asynchronous generator. So the correct use of the proposed feature would be:
async with contextlib.aclosing(aiter(queue)) as it:
async for item in it:
# process item
instead of simple
async for item in queue:
# process item
We should also consider to add a method or a global function which returns an iterator instead of making Queue an iterable. It will allow to emit a warning if the iterator was not closed.
It is not clear what to do with task_done() if the iteration was stopped (exception or break/return) before exhausting the iterator.
Rather than using an async generator, you could use a class with a __anext__ then you don't need to worry about exceptions being thrown in, or the async generator not being closed
Like this:
class AsyncQueueIterator:
def __init__(self, queue):
self.queue = queue
def __aiter__(self):
return self
async def __anext__(self):
try:
item = await self.queue.get()
except asyncio.QueueShutDown:
raise StopAsyncIteration
else:
return item
class Queue:
...
def __aiter__(self):
return AsyncQueueIterator(self)
iter() has a two-argument form which allows you to specify a callable and the stop value. There were many discussions about allowing to specify the stop exceptions. The same is applicable to aiter(). If it was implemented, you could simply write:
async for item in aiter(queue.get, stop_exc=asyncio.QueueShutDown):
# process item
Of course, such feature could be used in many other cases, not only with a queue.
There is other meaning of "iterating a queue":
async for item in aiter(queue.get_nowait, stop_exc=(asyncio.QueueEmpty, asyncio.QueueShutDown)):
# process item
With this idiom you have a choice. __aiter__ can only implement one of them.
I think think the first implementation is the most useful, otherwise you need to create all tasks up front:
consumed everything # incorrect
0 -> queue
1 -> queue
2 -> queue
3 -> queue
4 -> queue
produced everything
Also, it isn't even asynchronous:
class QueueIterator:
def __init__(self, queue):
self.queue = queue
def __iter__(self):
return self
async def __next__(self):
try:
return self.queue.get_nowait()
except (QueueEmpty, QueueShutDown):
raise StopIteration
It's impossible to implement this asynchronously:
File "/Users/wannes/Documents/GitHub/cpython/add-asyncio.Queue.__aiter__/Lib/asyncio/queues.py", line 43, in __anext__
return await self.queue.get_nowait()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: object int can't be used in 'await' expression
Oh, right, get_nowait() is not asynchronous, so this would be a synchronous iterator. We can add asyncio.Queue.__iter__, but it would be very confusing, and do not translate to synchronous queues.
So I propose revive the old idea about the stop exception for iter() and aiter() (there may be already open issue for this, maybe with a PR).
If after this we still want to make asyncio.Queue iterable, it can be implemented much easier. But there are many other queue implementations in the stdlib. We should considered them and decide which of them to make iterable. Either make them all iterable (with the same semantic), or set a rule why some queues are iterable and others are not.
I see, so queue.Queue would be an iterable, and asyncio.Queue an asynchronous iterable.
If after this we still want to make
asyncio.Queueiterable, it can be implemented much easier. But there are many other queue implementations in the stdlib. We should considered them and decide which of them to make iterable. Either make them all iterable (with the same semantic), or set a rule why some queues are iterable and others are not.
I think it's reasonable to make queue types (a)iterable independently of whether (a)iter supports customized stop-exceptions - if nothing else, it's a much smaller feature. I'm not taking a position on whether it's worth making sync queue classes iterable, but if you decide yes I did give a list of what I'd do for each in the top comment:
python/peps#3782 (review) suggested that
queue.Queuecould also be iterable. If we're extending this to synchronous classes I'd also includemultiprocessing.Queueandmultiprocessing.SimpleQueue. I'd omitmultiprocessing.connection.Connection, due to the byte-level send/recv methods, andqueue.SimpleQueuebecause without a.close()or.shutdown()method there's no clean way to shut down.
I have made AsyncQueueIterator private. Then we can simplify it later without needing to go trough the deprecation cycle.
Could you elaborate on how this would work for multiprocessing.Queue and multiprocessing.SimpleQueue?
I have made
AsyncQueueIteratorprivate. Then we can simplify it later without needing to go trough the deprecation cycle.
This change is being reviewed and discussed in https://github.com/python/cpython/pull/120491/files/0a8a72b59b42abc67c8084053bf3e4cf2259bbb7#r1645878198 to determine if this class is necessary.
@vstinner, can you close this? It'll be tracked further in the more general issue.
This issue has many people involved, many comments, I don't think that it should be closed.
I personally think the alternative solution is better as it provides all functionality and is consistent for both queues.
Additionally, it follows the naming convention of the other functions making it intuitive and discoverable by IDE suggestions.
But opinions vary, so if I can't convince you, we move further with just asyncio.Queue.__aiter__().
I also think many of the points rhettinger raised equally apply to asyncio.Queue.__aiter__():
- With
shutdownbeing new, it is better to keep it explicit.- Currently the
get()is an obvious marker of where blocking occurs. Code will be harder to review with it hidden behind an abstraction.- Current the
getprovides non-blocking [...] options which become inaccessible with this.Minor respellings typically don't benefit users. In this case, I think it makes them worse-off being hiding two essential elements that should be explicit (the
getcall and catching the exception).I am clear that we don't have to do this. It is only combines a while-get with a try/except. There is no new capability being added. It is perfectly reasonable to wait to see how people use
shutdown()All the PR does is hide both a
get()and atry/except[...] Hiding the two steps makes users worse-off. Besides adding overhead, it hides a blocking call which in my expericience is something that needs to be explicit. [...] And as noted above it, distances the user from [...] non-blocking behaviors which typically need special handling distinct from the case of ashutdown. I also don't see how this would work well with code that wants tojoin()a queue where a worker needs to calltask_done(). In short, users that want to do anything other than one specific case are made worse-off by trying to use the for-loop variant. It won't even be obvious that they have options.
@nineteendo Please slow down and listen to the guidance being shared by core developers. When creating a new feature, opening a PR and being able to code the feature are only a small parts of the development process.
First and foremost, the maintainers of the module need to consider the impact of adding a new feature to:
- the module itself
- the other parts of the language that interact with the new feature
- whether it breaks backward compatibility
- whether it changes the API that other projects depend upon
- how much cognitive load and complexity the new feature adds for users
To sum it up, new features are generally added slowly after considering the impacts.
Please also recognize that many core developers are volunteers who have many responsibilities. Being respectful of people's time is very important. Multiple requests, messages, and PRs will not move your PR forward faster.
Please slow down
I wanted to quickly provide an alternative implementation before the current implementation gets merged. If I do not longer get a reponse, I will wait until the PR is 1 month old before asking again.
First and foremost, the maintainers of the module need to consider the impact of adding a new feature
That's precisely why new features first need to be discussed on Discouse, which Guido mentioned specifically I don't understand why there's an exception for this suggestion. There's no mention of __aiter__ on Discouse after Guido's comment.
Multiple requests, messages, and PRs will not move your PR forward faster.
It wasn't my intention to request a review again (I merged both PRs as Zac suggested to open at most 2) Sorry for the chaos, I focused too much on the title of the issue being too narrow.
Take your time to evaluate, the PRs are drafts, so can't be merged.
Hi folks. Is this actual thread about asyncio.Queue.__aiter__ thing? Cause I had seen 3 other issues and 2 PRs from @nineteendo. Each one mention each other and closed because of some comments. What gets a little messy.
Just wondering why it is so complicated to make this feature. It looks like an extension of asyncio.Queue class and will not anyhow break backward compatibility or interact with anything.
Anyway making queue async iterable will be extra useful.
Yes, this is the issue for the 3 PRs. The other 2 issues were closed as a duplicate of this one.
Closing this issue as feature-declined, as Raymond describes in https://github.com/python/cpython/pull/120925#issuecomment-2370151879 (and earlier in https://github.com/python/cpython/pull/120503#issuecomment-2185001370).