prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Add a `gather()`-type operation to resolve a `list[PrefectFuture]` into a `list[Result]`

Open Andrew-S-Rosen opened this issue 1 year ago • 3 comments

First check

  • [X] I added a descriptive title to this issue.
  • [X] I used the GitHub search to find a similar request and didn't find it.
  • [X] I searched the Prefect documentation for this feature.

Prefect Version

2.x

Describe the current behavior

When running a subflow in Prefect that returns a list[PrefectFuture], the user has to iterate through each entry in the list and manually resolve each future to retrieve the desired list[Result]. Consider the following toy example:

from prefect import flow, task


@task
def add(a, b):
    return a + b


@flow
def add_distributed(vals, b):
    outputs = []
    for val in vals:
        output = add.submit(val, b)
        outputs.append(output)
    return outputs


@flow
def workflow(vals, b):
    return add_distributed(vals, b)

futures = add_distributed([1, 2], 3)  # list[PrefectFuture]
results = [future.result() for future in futures]  # list[int]

As seen above, the only way to resolve the futures is by iterating through each entry and manually resolving each one.

This has two notable downsides: 1) It is not intuitive to all end-users of the Prefect flow; 2) It requires blocking .result() calls to be used if this kind of pattern is part of a larger workflow.

Describe the proposed behavior

Inspired by options like Dask's Client.gather() operation, Prefect should consider adding a similar gather-type operation that would allow for a single function to be called on the list[PrefectFuture] to resolve the items in the iterable.

Example Use

No response

Additional context

No response

Andrew-S-Rosen avatar Jan 21 '24 03:01 Andrew-S-Rosen

hi @Andrew-S-Rosen - I could see convenience in having a gather_futures util that handles asyncio.gathering the .result() calls for you.

Would you be interested in contributing this?

zzstoatzz avatar Jan 23 '24 18:01 zzstoatzz

Glad to hear you also could see the value in a utility like this!

Unfortunately, I don't have a great grasp of asyncio so don't know that this is one I'll be able to tackle personally. If I find some time, I'll try but would say the probability for success is low!

Andrew-S-Rosen avatar Jan 23 '24 18:01 Andrew-S-Rosen

@Andrew-S-Rosen no worries! we'll put this on the backlog and get to it when we can

just as a note if its helpful, you can simplify your example above using .map

from prefect import flow, task

@task
def add(a, b):
    return a + b

@flow
def workflow(vals: list, b: int):
    return [future.result() for future in add.map(vals, b)]

print(workflow([1, 2], 3))

zzstoatzz avatar Jan 23 '24 18:01 zzstoatzz

@zzstoatzz can you assign it to me please?

eladm26 avatar Feb 22 '24 08:02 eladm26

@eladm26 I don't think you need to be assigned on it. You can simply submit a PR.

Andrew-S-Rosen avatar Feb 24 '24 17:02 Andrew-S-Rosen

@Andrew-S-Rosen I just stumbled upon resolve_futures_to_data, It uses asyncio.gather under the hood, I think this util function should address your needs. @zzstoatzz what do you think?

eladm26 avatar Feb 25 '24 16:02 eladm26

@eladm26 good find! That seems like it would be it, although I'm having some trouble confirming this. I think it doesn't quite work as expected because in the proposed example, the tasks are not async-based? I imagine await asyncio.gather() inside the function itself isn't doing anything in this particular case.

from prefect import flow, task
from prefect.futures import resolve_futures_to_data


@task
def add(a, b):
    return a + b


@flow
def add_distributed(vals, b):
    outputs = []
    for val in vals:
        output = add.submit(val, b)
        outputs.append(output)
    return outputs


@flow
def workflow(vals, b):
    return add_distributed(vals, b)

futures = add_distributed([1, 2], 3)  # list[PrefectFuture]
results = await resolve_futures_to_data(futures)
assert futures != results # this fails

Andrew-S-Rosen avatar Feb 25 '24 16:02 Andrew-S-Rosen

When you return a PrefectFuture from a flow, you get a State and you can call .result() on it.

@task
def add(a, b):
    return a + b


@flow
def add_distributed(vals, b):
    outputs = []
    for val in vals:
        output = add.submit(val, b)
        outputs.append(output)
    return outputs


@flow
def workflow(vals, b):
    states = add_distributed(vals, b)
    for state in states:
        print(type(state))
        print(state.result())

workflow([1, 2], 3)
<class 'prefect.client.schemas.objects.State'>
4
<class 'prefect.client.schemas.objects.State'>
5

resolve_futures_to_data works fine if you call it inside add_distributed where the PrefectFutures are still present.

kevingrismore avatar Feb 25 '24 16:02 kevingrismore

I see. Thanks, @kevingrismore! In my scenario, I was hoping to be able to call the gather-type operation afterwards so as to minimize the injection of Prefect-specific logic inside the @flow definition itself, but this is very useful to know and will be very helpful.

Andrew-S-Rosen avatar Feb 25 '24 18:02 Andrew-S-Rosen

So, is this issue solved? @Andrew-S-Rosen @zzstoatzz ?

eladm26 avatar Feb 26 '24 08:02 eladm26

@eladm26 I suppose it can be closed, as my title does have a corresponding function.

Andrew-S-Rosen avatar Feb 26 '24 08:02 Andrew-S-Rosen

Thank you dude

On Mon, 26 Feb 2024 at 10:56 Andrew S. Rosen @.***> wrote:

@eladm26 https://github.com/eladm26 I suppose it can be closed, as my title does have a corresponding function.

— Reply to this email directly, view it on GitHub https://github.com/PrefectHQ/prefect/issues/11694#issuecomment-1963610083, or unsubscribe https://github.com/notifications/unsubscribe-auth/AA6P43EXE3DGCLY3ZT6G7NTYVRE4LAVCNFSM6AAAAABCDVK44CVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSNRTGYYTAMBYGM . You are receiving this because you were mentioned.Message ID: @.***>

eladm26 avatar Feb 26 '24 09:02 eladm26