prefect
prefect copied to clipboard
Add a `gather()`-type operation to resolve a `list[PrefectFuture]` into a `list[Result]`
First check
- [X] I added a descriptive title to this issue.
- [X] I used the GitHub search to find a similar request and didn't find it.
- [X] I searched the Prefect documentation for this feature.
Prefect Version
2.x
Describe the current behavior
When running a subflow in Prefect that returns a list[PrefectFuture]
, the user has to iterate through each entry in the list and manually resolve each future to retrieve the desired list[Result]
. Consider the following toy example:
from prefect import flow, task
@task
def add(a, b):
return a + b
@flow
def add_distributed(vals, b):
outputs = []
for val in vals:
output = add.submit(val, b)
outputs.append(output)
return outputs
@flow
def workflow(vals, b):
return add_distributed(vals, b)
futures = add_distributed([1, 2], 3) # list[PrefectFuture]
results = [future.result() for future in futures] # list[int]
As seen above, the only way to resolve the futures is by iterating through each entry and manually resolving each one.
This has two notable downsides: 1) It is not intuitive to all end-users of the Prefect flow; 2) It requires blocking .result()
calls to be used if this kind of pattern is part of a larger workflow.
Describe the proposed behavior
Inspired by options like Dask's Client.gather()
operation, Prefect should consider adding a similar gather-type operation that would allow for a single function to be called on the list[PrefectFuture]
to resolve the items in the iterable.
Example Use
No response
Additional context
No response
hi @Andrew-S-Rosen - I could see convenience in having a gather_futures
util that handles asyncio.gather
ing the .result()
calls for you.
Would you be interested in contributing this?
Glad to hear you also could see the value in a utility like this!
Unfortunately, I don't have a great grasp of asyncio so don't know that this is one I'll be able to tackle personally. If I find some time, I'll try but would say the probability for success is low!
@Andrew-S-Rosen no worries! we'll put this on the backlog and get to it when we can
just as a note if its helpful, you can simplify your example above using .map
from prefect import flow, task
@task
def add(a, b):
return a + b
@flow
def workflow(vals: list, b: int):
return [future.result() for future in add.map(vals, b)]
print(workflow([1, 2], 3))
@zzstoatzz can you assign it to me please?
@eladm26 I don't think you need to be assigned on it. You can simply submit a PR.
@Andrew-S-Rosen I just stumbled upon resolve_futures_to_data, It uses asyncio.gather under the hood, I think this util function should address your needs. @zzstoatzz what do you think?
@eladm26 good find! That seems like it would be it, although I'm having some trouble confirming this. I think it doesn't quite work as expected because in the proposed example, the tasks are not async-based? I imagine await asyncio.gather()
inside the function itself isn't doing anything in this particular case.
from prefect import flow, task
from prefect.futures import resolve_futures_to_data
@task
def add(a, b):
return a + b
@flow
def add_distributed(vals, b):
outputs = []
for val in vals:
output = add.submit(val, b)
outputs.append(output)
return outputs
@flow
def workflow(vals, b):
return add_distributed(vals, b)
futures = add_distributed([1, 2], 3) # list[PrefectFuture]
results = await resolve_futures_to_data(futures)
assert futures != results # this fails
When you return a PrefectFuture
from a flow, you get a State
and you can call .result()
on it.
@task
def add(a, b):
return a + b
@flow
def add_distributed(vals, b):
outputs = []
for val in vals:
output = add.submit(val, b)
outputs.append(output)
return outputs
@flow
def workflow(vals, b):
states = add_distributed(vals, b)
for state in states:
print(type(state))
print(state.result())
workflow([1, 2], 3)
<class 'prefect.client.schemas.objects.State'>
4
<class 'prefect.client.schemas.objects.State'>
5
resolve_futures_to_data
works fine if you call it inside add_distributed
where the PrefectFuture
s are still present.
I see. Thanks, @kevingrismore! In my scenario, I was hoping to be able to call the gather-type operation afterwards so as to minimize the injection of Prefect-specific logic inside the @flow
definition itself, but this is very useful to know and will be very helpful.
So, is this issue solved? @Andrew-S-Rosen @zzstoatzz ?
@eladm26 I suppose it can be closed, as my title does have a corresponding function.
Thank you dude
On Mon, 26 Feb 2024 at 10:56 Andrew S. Rosen @.***> wrote:
@eladm26 https://github.com/eladm26 I suppose it can be closed, as my title does have a corresponding function.
— Reply to this email directly, view it on GitHub https://github.com/PrefectHQ/prefect/issues/11694#issuecomment-1963610083, or unsubscribe https://github.com/notifications/unsubscribe-auth/AA6P43EXE3DGCLY3ZT6G7NTYVRE4LAVCNFSM6AAAAABCDVK44CVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSNRTGYYTAMBYGM . You are receiving this because you were mentioned.Message ID: @.***>