trio icon indicating copy to clipboard operation
trio copied to clipboard

Consider adding a helper function trio.gather

Open bobfang1992 opened this issue 2 years ago • 14 comments

Hi similar to asyncio.gather, it would be very helpful to have a trio.gather to collect the result of several coroutines and return them in a list. The best way to do that right now seems to be using channels, like in this gist.

The main reasons I think it would be useful are:

  • Some users, like myself, are using trio as a faster, drop-in replacement for asyncio, they may not fully understand the interfaces of trio when they are using it and are expecting it to have a similar interface like asyncio. Albeit this might be the wrong assumption, but by having a similar interface like asyncio we are not surprising them, so they can become long term users at some point.
  • Specifically for gather, I think this would be useful in a lot of scenarios. For me, I am hitting several different HTTP endpoints and gathering their results so I can process them, quite like a tiny map-reduce processing pipeline. I would argue it may be common enough to have a standalone helper function to support this use case.

Apology first, as I might be very ignorant and may not understand the goal and design motivations of trio at this moment as a new user. So I might be totally off the rail here. But if you guys also find it useful I would love to take the lead on this and implement this feature.

bobfang1992 avatar Dec 04 '21 21:12 bobfang1992

There's a much simpler way to do what gather() does, by using the behaviour of nurseries:

async def gather(*funcs):
    results = [None] * len(funcs)
    def runner(func, i):
        results[i] = await func()

    async with trio.open_nursery() as nursery:
        for i, func in enumerate(funcs):
            nursery.start_soon(runner, func, i)
    return results

A nursery only exits once all its children are complete, so all that needs to be done is make a list to collect results, wrap each function in a coroutine that runs it and stores the result, then schedule all of those to run.

This doesn't match AsyncIO's cancellation/error behaviour though - if an error occurs, all the tasks are cancelled and the results are discarded. That could be tweaked by adding a try-catch to the runner function, and say just storing the exception then calling nursery.cancel_scope.cancel(), or not cancelling if you want it to continue.

TeamSpen210 avatar Dec 05 '21 03:12 TeamSpen210

If you want to catch exceptions,outcome is your friend.

In any case, the point of Structured Concurrency is, well, structure. Trio doesn't have the (somewhat-unstructured) interface of asyncio precisely because the latter's interfaces don't afford structured use unless you jump through hoops like anyio.

Also, asyncio.gather is called with a list of tasks and/or futures. Trio doesn't have either of these.

What Trio does have instead are memory channels. Open one, start the "reduce" task that reads from the channel and collects the results, then use a nursery to run your HTTP endpoints and feed the results to the channel as they come in. This way actually requires less code than using gather, allows concurrency between the map and reduce phases, and allows for incremental generation of "map" tasks instead of starting them all at once.

smurfix avatar Dec 05 '21 10:12 smurfix

The context of the original question was "let me download several pages at once, then do something with them." Seems a very common use case for me. Implementing in asyncio is simple and easy to remember:

    (result1, result2) = await asyncio.gather(
        get_contents('https://www.phoenix-int.com'),
        get_contents('https://www.google.com'))

What I'm saying is that while everything said previous to this is true, this particular case is a simple case that needs a simple answer. If I have to define functions, instantiate memory channels, look up on the web how to accomplish this simple task, it feels like something is off the rails.

The whole point of the async/await movement is that you can write simple code that feels like you think (procedural), but it is automatically converted to what the computer needs (async continuations). The decision to avoid gather and/or Futures is forcing me to think in continuations again.

Note that having some structure available to accomplish this does not mean all the various options given above, with all of their advanced benefits, won't still be available.

phxnsharp avatar May 19 '22 15:05 phxnsharp

Perhaps the issue is really a more general one - the return values of nursery-spawned tasks are being discarded, meaning they have to use alternate methods to pass back values. That limits reusability of the functions, and makes things like this a little awkward. One solution would be add a deliver= parameter like trio.lowlevel.start_thread_soon(), which is called with the result when the task finishes. You could then just pass result_list.append in to collect the values. Or perhaps we need a more general function composition mechanism, since that's what we're trying to do here...

TeamSpen210 avatar May 20 '22 02:05 TeamSpen210

I can already use a lambda that calls result_list.append(), not sure what the advantage of having a deliver= parameter is.

phxnsharp avatar May 20 '22 15:05 phxnsharp

It'd be equivalent to passing this to start_soon():

async def task():
    deliver(await func(*args))

The problem is you can't do an async lambda, so you either have to do a full function definition to be able to pass that through start_soon(), or build it into the task (which makes it more difficult to reuse elsewhere). With that, your gather example becomes:

results = []
async with trio.open_nursery() as nursery:
    for url in ['https://www.phoenix-int.com', https://www.google.com']:
        nursery.start_soon(get_contents, url, deliver=results.append)

So once get_contents is completed successfully, its return value is passed to deliver() which stores it.

TeamSpen210 avatar May 20 '22 22:05 TeamSpen210

Ahh yes, you can't have an async lambda.

It seems to me there is a fundamental incompatibility between the desire to control parallelism through the nursery metaphor, and the desire to have simple procedural code. Your solution works, but requires thinking in completions and randomizes the result ordering. You could even argue that it violates: Every function executes in a straightforward, top-to-bottom manner, finishing each operation before moving on to the next – like Guido intended.

I would argue that the gather function is the best compromise. It:

  • follows Guido's intentions. The line of code finishes completely before the next line can run.
  • allows for gathering exceptions like the nursery metaphor
  • allows cancellations to cleanly pass to the children
  • doesn't introduce any futures or other concepts that could allow someone to 'work around' the nursery concept
  • doesn't invalidate the nursery metaphor, which can co-exist with this for other use cases.

Ultimately you will have to decide if the use case presented here is common enough to be worthy of the time it would take to implement it.

phxnsharp avatar May 24 '22 13:05 phxnsharp

I don't like the idea of adding another argument to .start_soon (it annoys me that they already exist and that they're pretty much the reason keyword arguments can't / aren't passed to async function, but that's a different topic).

Could we not have a nursery.start_with_result (or something) that causes trio to store the result with a lifetime the same as the nursery? Trio would track the order in which this function is called and then you could do a nursery.wait_for_results(). This probably has some memory considerations, but it would be almost equivalent to the asyncio.gather(). Except the end-user doesn't keep references to some sort of task object nor need to create a large list of partial functions to pass into a gather function.

The nusery.wait_for_results() may not even be necessary as the nursery will only complete once all tasks are finished. Having a nursery.results at the end should be sufficient, no?

Sxderp avatar May 24 '22 16:05 Sxderp

One problem with both adding a result list to a nursery and gather() is that it combines specifying destination of a task with the nursery itself. If you want to spawn additional tasks (say if you're recursing), or want to put the results of specific tasks in different result lists these solutions wouldn't work at all. For instance what if you're wanting to store in a specific index, or into a dict?

TeamSpen210 avatar May 24 '22 22:05 TeamSpen210

Looks like https://github.com/python-trio/trimeter and https://github.com/florimondmanca/aiometer have not been mentioned in this issue yet. Unfortunately they're not really maintained.

pquentin avatar May 25 '22 06:05 pquentin

For instance what if you're wanting to store in a specific index, or into a dict?

asyncio.gather doesn't really let you do this while the tasks are running either. You get the result of asyncio.gather and assign it to something [1] so I'm not sure this is that big of a dealbreaker.

If you want to spawn additional tasks (say if you're recursing)

I'm not sure I understand what you're saying. This might be problematic if you use the standard return (which is the most obvious way, I suppose), but if the interface is closer to nursery.start does this issue go away? Hard to suggest something since I feel I'm missing some context.

[1] From the docs L = await asyncio.gather(factorial("A", 2), factorial("B", 3), factorial("C", 4))

Sxderp avatar May 25 '22 15:05 Sxderp

Sure, if all you need is a list of results, then go ahead and use the gather function from this issue's first answer. That's reasonably simple and probably should be in the docs somewhere.

However, frequently you don't need a list, you just need the results – do you care about ordering, if so can you start processing the first result while the others are computed, if not do you need to know which position the result has or are you happy to know when you've processed them all? how do you handle errors? how about cancellations? do you need to add more tasks dynamically?

gather is just one (and IMHO, as far as async interfaces go, more often than not rather suboptimal) solution in this problem space. Why should it be privileged?

smurfix avatar May 25 '22 20:05 smurfix

I'd like to emphasize the request made in this issue and more specifically in this comment:

The context of the original question was "let me download several pages at once, then do something with them." Seems a very common use case for me. Implementing in asyncio is simple and easy to remember:

    (result1, result2) = await asyncio.gather(
        get_contents('https://www.phoenix-int.com'),
        get_contents('https://www.google.com'))

What I'm saying is that while everything said previous to this is true, this particular case is a simple case that needs a simple answer. If I have to define functions, instantiate memory channels, look up on the web how to accomplish this simple task, it feels like something is off the rails.

The whole point of the async/await movement is that you can write simple code that feels like you think (procedural), but it is automatically converted to what the computer needs (async continuations). The decision to avoid gather and/or Futures is forcing me to think in continuations again.

Note that having some structure available to accomplish this does not mean all the various options given above, with all of their advanced benefits, won't still be available.

Trio is a genuinely great async library in design because of the correctness goal is aims which is perfectly summed up by the author's blog post on structured concurrency. However I found that Trio nurseries generate a lot of friction in practice, mostly because they do not have a built-in mechanism to retrieve values that are concurrently retrieved (either as soon as they are ready or when they are all done). As it can be seen in this thread, users have to write boilerplate code and use sub-optimal constructs (such as memory channels) to achieve the intended behavior, and it may lead to incorrect code. IMO a trio.gather function would nicely fit the library and would help people getting concurrency right.

I'm using another language, Swift, which I think also got structured concurrency right and could be a great source of inspiration. Swift TaskGroup is the equivalent to Trio nursery and is allowed to return data through a built-in async-iterator interface:

func asyncRandomInt() async -> Int {
  try! await Task.sleep(nanoseconds: 1_000_000_000)
  return Int.random(in: 0..<10)
}

let randomIntegers = await withTaskGroup { group in  // Equivalent to `with open_nursery() as nursery`
  for _ in 0..<100 {
    group.addTask {  // Equivalent to `nursery.call_soon()`
      await asyncRandomInt()
    }
  }

  // Async-iterator interface to process items as soon as they arrive (out of order)
  var numbers = [Int]()
  for await number in group {
    numbers.append(number)
  }

  return numbers
}

// The group exits when all child tasks are done
// Can use the resulting `randomIntegers`

Swift also has a syntactic feature to schedule a fixed number of non-homogeneous work items concurrently:

// Those three tasks are executed concurrently
async let number1 = asyncRandomInt()
async let number2 = asyncRandomInt()
async let string = someAsyncFunctionReturningAString()

await print(string, number1 + number2)

Which would be pretty similar to what trio.gather would allow:

number1, number2, string = await trio.gather(
  asyncRandomInt, 
  asyncRandomInt,
  someAsyncFunctionReturningAString
)

print(string, number1 + number2)

laclouis5 avatar Sep 03 '22 14:09 laclouis5

Yeah, well, we don't have those syntactic features, and the merits of the Swift syntax are in the eyes of the beholder.

If you want a result stream rather than a gather-style list, a taskgroup wrapper that sends the return values of your task functions to a memory stream which you can then iterate over is 70 lines of code. No boilerplate code required. Feel free to share and adapt.

https://gist.github.com/smurfix/0130817fa5ba6d3bb4a0f00e4d93cf86

smurfix avatar Sep 03 '22 18:09 smurfix

Closing this because I think everyone has somewhat different requirements, and so it's better implemented downstream.

Zac-HD avatar Mar 17 '23 03:03 Zac-HD

I was very frustrated about lacking the possibility to get the return value from a function too. That prevents code re-usage a lot IMHO. For anyone who feels the same: there is a library by the great author of FastAPI called asyncer. It's small and its soonify function returns value! Didn't see it spoken about much but it's pretty neat

sudoandros avatar Jul 14 '23 16:07 sudoandros

As well as asyncer I've written a similar library aioresult. (Apologies to those subscribed to multiple issues where I've mentioned it, I hope I'm not being too spammy.) asyncer is definitely interesting though, it has slightly different trade-offs to aioresult e.g. you can't use it to wait for a result, and its odd parameter mechanism and its own special nursery type seem a bit clumsy to me. But it does have really good type hinting (and a much better landing page!).

As others have said above, once you have a way to capture a return value, you can combine that with a nursery to effectively get gather():

async with trio.open_nursery() as n:
    results = [ResultCapture.start_soon(n, foo, i) for i in range(10)]
print("results:", *[r.result() for r in results])

In a sense, it's even better than gather() (aside from the better exception and cancellation handling) because you can put the result objects into any structure you like, e.g., a dict.

aioresult does also have a wait_all() function which is a bit like gather() but is not normally what you'd want (and it doesn't spawn a nursery for you, so you'd still need to do that).

arthur-tacca avatar Feb 15 '24 15:02 arthur-tacca