trio-util icon indicating copy to clipboard operation
trio-util copied to clipboard

[feature request] async as_completed function

Open dhirschfeld opened this issue 3 years ago • 7 comments

I think an async-iterable function which took functions to execute asynchronously and yielded the results as an Outcome objects would be very useful.

I find the daskas_completed api to be very convenient and am looking for something similar in trio.

I think it's a more fundamental primitive than either wait_any or wait_all as (I think) both could be implemented with an async def as_completed(*funcs) -> AsyncIterable function.

dhirschfeld avatar Jul 31 '20 11:07 dhirschfeld

There was some discussion of this on a Trio issue, https://github.com/python-trio/trio/issues/1089#issuecomment-521552829, and that thread noted the obstacle that a generator can't yield from an internal nursery. So you end up having to pass in an external nursery, or hop through a context manager.

belm0 avatar Aug 01 '20 10:08 belm0

Thanks for the link @belm0 - an interesting discussion! I don't think having to pass in a nursery is too onerous. Perhaps an api like below could be made to work?

async with trio.open_nursery() as nursery:
    async for idx, outcome in as_completed(*async_fns, nursery=nursery):
        nursery.start_soon(process_result, outcome.unwrap())

dhirschfeld avatar Aug 01 '20 11:08 dhirschfeld

I thought I'd naively try to use trio_async_generator to implement this but failed at the first hurdle. Haven't had time to look into it but thought I'd post in case it's obvious what I'm doing wrong.

from functools import partial
from typing import Any, AsyncIterable, Awaitable, Callable, Tuple
from trio_util import trio_async_generator


@trio_async_generator
async def as_completed(
    *funcs: Callable[[], Awaitable[Any]]
) -> AsyncIterable[Tuple[int, Any]]:

    async def yield_result(func: Callable[[], Awaitable[Any]], *, idx: int) -> None:
        result = await func()
        yield idx, result

    async with trio.open_nursery() as nursery:
        for idx, func in enumerate(funcs):
            nursery.start_soon(
                partial(yield_result, func, idx=idx)
            )
async def f(x): 
    return x


async with as_completed(partial(f, 1), partial(f, 2)) as results:
    async for idx, result in results:
        print(idx, result)
Traceback (most recent call last):
  File "C:\Users\dhirschf\envs\dev\lib\site-packages\trio_util\_trio_async_generator.py", line 50, in adapter
    value = await agen.__anext__()
AttributeError: 'coroutine' object has no attribute '__anext__'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\dhirschf\envs\dev\lib\site-packages\IPython\core\interactiveshell.py", line 3435, in run_code
    last_expr = (await self._async_exec(code_obj, self.user_ns))
  File "<ipython-input-94-2025cd5b27a0>", line 6, in async-def-wrapper
  File "C:\Users\dhirschf\envs\dev\lib\contextlib.py", line 177, in __aexit__
    await self.gen.__anext__()
  File "C:\Users\dhirschf\envs\dev\lib\site-packages\trio_util\_trio_async_generator.py", line 72, in wrapper
    yield receive_channel
  File "C:\Users\dhirschf\envs\dev\lib\site-packages\trio\_core\_run.py", line 813, in __aexit__
    raise combined_error_from_nursery
  File "C:\Users\dhirschf\envs\dev\lib\site-packages\trio_util\_trio_async_generator.py", line 68, in adapter
    return
  File "C:\Users\dhirschf\envs\dev\lib\site-packages\async_generator\_util.py", line 14, in __aexit__
    await self._aiter.aclose()
AttributeError: 'coroutine' object has no attribute 'aclose'

I'm testing on py37 in case that makes a difference.

dhirschfeld avatar Apr 13 '22 12:04 dhirschfeld

I think the body of as_completed() itself needs a yield to be a generator. The yield in yield_result() applies to that nested function (making it an async generator), rather than applying to as_completed().

belm0 avatar Apr 13 '22 12:04 belm0

Yeah, I figured that wouldn't work 🤦‍♂️

Will have to think how I can yield as results arrive 🤔

dhirschfeld avatar Apr 13 '22 18:04 dhirschfeld

I guess the function results feed into a trio memory channel, and the main loop just yields incoming items?

belm0 avatar Apr 13 '22 23:04 belm0

I guess the function results feed into a trio memory channel, and the main loop just yields incoming items?

That sounds like it could work - will give it a go and report back!

dhirschfeld avatar Apr 14 '22 01:04 dhirschfeld