cpython icon indicating copy to clipboard operation
cpython copied to clipboard

Add `asyncio.Queue.__aiter__`

Open Zac-HD opened this issue 1 year ago • 9 comments

Feature or enhancement

Proposal:

Over the last few years, Trio and AnyIO users have proven out several design patterns using channels as async iterables. For example, having a context manager yield an async iterable avoids the motivating problems of both PEP-533 and PEP-789.

An asyncio.Queue is almost identical to a channel-pair, especially with the .shutdown() method added in Python 3.13. I therefore propose that we add an .__aiter__ method, to more support such design patterns without subclassing or a generator helper function, with an implementation as described in https://github.com/python/cpython/issues/119154#issuecomment-2120692346

Links to previous discussion of this feature:

https://github.com/python/peps/pull/3782#pullrequestreview-2059307313 suggested that queue.Queue could also be iterable. If we're extending this to synchronous classes I'd also include multiprocessing.Queue and multiprocessing.SimpleQueue. I'd omit multiprocessing.connection.Connection, due to the byte-level send/recv methods, and queue.SimpleQueue because without a .close() or .shutdown() method there's no clean way to shut down.

Limitations

Making Queue aiterable reaches API parity for single-producer, single-consumer patterns. In multi-producer and/or multi-consumer patterns, without a .clone() method it is the user's responsibility to shut down the queue when the last task is done. I do not propose to add .clone(), but we could include that link in the docs as an option for multi-producer patterns if desired.

Zac-HD avatar May 18 '24 19:05 Zac-HD

It should be:

    async def __aiter__(self):
        try:
            while True:
                yield await self.get()
        except asyncio.QueueShutDown:
            return

Or it might be even better to implement __anext__ and do

def __aiter__(self):
    return self

graingert avatar May 19 '24 17:05 graingert

Probably should also have a task_done call.

gvanrossum avatar May 19 '24 19:05 gvanrossum

I've included those suggestions in the top comment, sticking with __aiter__ because I don't see a clean way to support task_done() from __anext__ - and it feels more reasonable to describe queues as aiterables than aiterators.

Zac-HD avatar May 20 '24 02:05 Zac-HD

I suspect the real problem is that the scope of the try/except is too large, and should only go around the await queue.get() part? (And you need s/queue/self/.)

gvanrossum avatar May 20 '24 02:05 gvanrossum

Maybe this?

async def __aiter__(self):
    while True:
        try:
            item = await self.get()
        except asyncio.QueueShutDown:
            return
        yield item
        self.task_done()

A philosophical question is, if yield item raises (i.e., the caller throws an exception into the generator), should task_done() be called or not?

Also, does it look like you're proposing a legit use case for async generators?

gvanrossum avatar May 20 '24 02:05 gvanrossum

Also, does it look like you're proposing a legit use case for async generators?

Yes! Generators (sync or async) are a really elegant syntax for defining iterables, and the problems motivating PEP-789 only occur if you yield (and thus suspend execution) within certain context managers. If you don't have a context manager in the generator function, and you're careful about finalization timing for PEP-533 reasons, I think they're great.

Zac-HD avatar May 20 '24 03:05 Zac-HD

In general, you should always close an asynchronous generator. So the correct use of the proposed feature would be:

async with contextlib.aclosing(aiter(queue)) as it:
    async for item in it:
        # process item

instead of simple

async for item in queue:
    # process item

We should also consider to add a method or a global function which returns an iterator instead of making Queue an iterable. It will allow to emit a warning if the iterator was not closed.

It is not clear what to do with task_done() if the iteration was stopped (exception or break/return) before exhausting the iterator.

serhiy-storchaka avatar May 20 '24 12:05 serhiy-storchaka

Rather than using an async generator, you could use a class with a __anext__ then you don't need to worry about exceptions being thrown in, or the async generator not being closed

graingert avatar May 20 '24 12:05 graingert

Like this:

class AsyncQueueIterator:
    def __init__(self, queue):
        self.queue = queue

    def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            item = await self.queue.get()
        except asyncio.QueueShutDown:
            raise StopAsyncIteration
        else:
            return item


class Queue:
    ...
    def __aiter__(self):
        return AsyncQueueIterator(self)

gvanrossum avatar May 20 '24 15:05 gvanrossum

iter() has a two-argument form which allows you to specify a callable and the stop value. There were many discussions about allowing to specify the stop exceptions. The same is applicable to aiter(). If it was implemented, you could simply write:

async for item in aiter(queue.get, stop_exc=asyncio.QueueShutDown):
    # process item

Of course, such feature could be used in many other cases, not only with a queue.

There is other meaning of "iterating a queue":

async for item in aiter(queue.get_nowait, stop_exc=(asyncio.QueueEmpty, asyncio.QueueShutDown)):
    # process item

With this idiom you have a choice. __aiter__ can only implement one of them.

serhiy-storchaka avatar Jun 14 '24 11:06 serhiy-storchaka

I think think the first implementation is the most useful, otherwise you need to create all tasks up front:

consumed everything # incorrect
0 -> queue
1 -> queue
2 -> queue
3 -> queue
4 -> queue
produced everything

Also, it isn't even asynchronous:

class QueueIterator:
    def __init__(self, queue):
        self.queue = queue

    def __iter__(self):
        return self

    async def __next__(self):
        try:
            return self.queue.get_nowait()
        except (QueueEmpty, QueueShutDown):
            raise StopIteration

It's impossible to implement this asynchronously:

  File "/Users/wannes/Documents/GitHub/cpython/add-asyncio.Queue.__aiter__/Lib/asyncio/queues.py", line 43, in __anext__
    return await self.queue.get_nowait()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: object int can't be used in 'await' expression

nineteendo avatar Jun 14 '24 11:06 nineteendo

Oh, right, get_nowait() is not asynchronous, so this would be a synchronous iterator. We can add asyncio.Queue.__iter__, but it would be very confusing, and do not translate to synchronous queues.

So I propose revive the old idea about the stop exception for iter() and aiter() (there may be already open issue for this, maybe with a PR).

If after this we still want to make asyncio.Queue iterable, it can be implemented much easier. But there are many other queue implementations in the stdlib. We should considered them and decide which of them to make iterable. Either make them all iterable (with the same semantic), or set a rule why some queues are iterable and others are not.

serhiy-storchaka avatar Jun 14 '24 13:06 serhiy-storchaka

I see, so queue.Queue would be an iterable, and asyncio.Queue an asynchronous iterable.

nineteendo avatar Jun 14 '24 17:06 nineteendo

If after this we still want to make asyncio.Queue iterable, it can be implemented much easier. But there are many other queue implementations in the stdlib. We should considered them and decide which of them to make iterable. Either make them all iterable (with the same semantic), or set a rule why some queues are iterable and others are not.

I think it's reasonable to make queue types (a)iterable independently of whether (a)iter supports customized stop-exceptions - if nothing else, it's a much smaller feature. I'm not taking a position on whether it's worth making sync queue classes iterable, but if you decide yes I did give a list of what I'd do for each in the top comment:

python/peps#3782 (review) suggested that queue.Queue could also be iterable. If we're extending this to synchronous classes I'd also include multiprocessing.Queue and multiprocessing.SimpleQueue. I'd omit multiprocessing.connection.Connection, due to the byte-level send/recv methods, and queue.SimpleQueue because without a .close() or .shutdown() method there's no clean way to shut down.

Zac-HD avatar Jun 18 '24 17:06 Zac-HD

I have made AsyncQueueIterator private. Then we can simplify it later without needing to go trough the deprecation cycle.

nineteendo avatar Jun 18 '24 17:06 nineteendo

Could you elaborate on how this would work for multiprocessing.Queue and multiprocessing.SimpleQueue?

nineteendo avatar Jun 18 '24 19:06 nineteendo

I have made AsyncQueueIterator private. Then we can simplify it later without needing to go trough the deprecation cycle.

This change is being reviewed and discussed in https://github.com/python/cpython/pull/120491/files/0a8a72b59b42abc67c8084053bf3e4cf2259bbb7#r1645878198 to determine if this class is necessary.

willingc avatar Jun 21 '24 00:06 willingc

@vstinner, can you close this? It'll be tracked further in the more general issue.

nineteendo avatar Jun 24 '24 13:06 nineteendo

This issue has many people involved, many comments, I don't think that it should be closed.

vstinner avatar Jun 24 '24 13:06 vstinner

I personally think the alternative solution is better as it provides all functionality and is consistent for both queues. Additionally, it follows the naming convention of the other functions making it intuitive and discoverable by IDE suggestions. But opinions vary, so if I can't convince you, we move further with just asyncio.Queue.__aiter__().

I also think many of the points rhettinger raised equally apply to asyncio.Queue.__aiter__():

  • With shutdown being new, it is better to keep it explicit.
  • Currently the get() is an obvious marker of where blocking occurs. Code will be harder to review with it hidden behind an abstraction.
  • Current the get provides non-blocking [...] options which become inaccessible with this.

Minor respellings typically don't benefit users. In this case, I think it makes them worse-off being hiding two essential elements that should be explicit (the get call and catching the exception).

I am clear that we don't have to do this. It is only combines a while-get with a try/except. There is no new capability being added. It is perfectly reasonable to wait to see how people use shutdown()

All the PR does is hide both a get() and a try/except [...] Hiding the two steps makes users worse-off. Besides adding overhead, it hides a blocking call which in my expericience is something that needs to be explicit. [...] And as noted above it, distances the user from [...] non-blocking behaviors which typically need special handling distinct from the case of a shutdown. I also don't see how this would work well with code that wants to join() a queue where a worker needs to call task_done(). In short, users that want to do anything other than one specific case are made worse-off by trying to use the for-loop variant. It won't even be obvious that they have options.

nineteendo avatar Jun 24 '24 13:06 nineteendo

@nineteendo Please slow down and listen to the guidance being shared by core developers. When creating a new feature, opening a PR and being able to code the feature are only a small parts of the development process.

First and foremost, the maintainers of the module need to consider the impact of adding a new feature to:

  • the module itself
  • the other parts of the language that interact with the new feature
  • whether it breaks backward compatibility
  • whether it changes the API that other projects depend upon
  • how much cognitive load and complexity the new feature adds for users

To sum it up, new features are generally added slowly after considering the impacts.

Please also recognize that many core developers are volunteers who have many responsibilities. Being respectful of people's time is very important. Multiple requests, messages, and PRs will not move your PR forward faster.

willingc avatar Jun 24 '24 17:06 willingc

Please slow down

I wanted to quickly provide an alternative implementation before the current implementation gets merged. If I do not longer get a reponse, I will wait until the PR is 1 month old before asking again.

First and foremost, the maintainers of the module need to consider the impact of adding a new feature

That's precisely why new features first need to be discussed on Discouse, which Guido mentioned specifically I don't understand why there's an exception for this suggestion. There's no mention of __aiter__ on Discouse after Guido's comment.

Multiple requests, messages, and PRs will not move your PR forward faster.

It wasn't my intention to request a review again (I merged both PRs as Zac suggested to open at most 2) Sorry for the chaos, I focused too much on the title of the issue being too narrow.

Take your time to evaluate, the PRs are drafts, so can't be merged.

nineteendo avatar Jun 24 '24 18:06 nineteendo

Hi folks. Is this actual thread about asyncio.Queue.__aiter__ thing? Cause I had seen 3 other issues and 2 PRs from @nineteendo. Each one mention each other and closed because of some comments. What gets a little messy.

Just wondering why it is so complicated to make this feature. It looks like an extension of asyncio.Queue class and will not anyhow break backward compatibility or interact with anything.

Anyway making queue async iterable will be extra useful.

novoxd avatar Feb 14 '25 13:02 novoxd

Yes, this is the issue for the 3 PRs. The other 2 issues were closed as a duplicate of this one.

nineteendo avatar Feb 14 '25 20:02 nineteendo

Closing this issue as feature-declined, as Raymond describes in https://github.com/python/cpython/pull/120925#issuecomment-2370151879 (and earlier in https://github.com/python/cpython/pull/120503#issuecomment-2185001370).

Zac-HD avatar Mar 10 '25 03:03 Zac-HD