flyte icon indicating copy to clipboard operation
flyte copied to clipboard

[Core feature] Support simple dataclass construction from promises in workflow/dynamic

Open madwed-stripe opened this issue 10 months ago • 7 comments

Motivation: Why do you think this is important?

A common pain point for our systems is the need to add a create_dataclass task at the end of a dynamic workflow. It would be wonderful to support dataclass construction from promises in a dynamic (and in a workflow).

Goal: What should the final outcome look like, ideally?

Here's a simple example of what we need to do today:

@dataclass
class MyCollection:
    values: dict[str, float]


@task
def transform_item(item: float) -> float:
    return 1.0


@task
def make_collection(values: dict[str, float]) -> MyCollection:
    return MyCollection(values)


@dynamic
def transform_collection(collection: MyCollection) -> MyCollection:
    transformed = {k: transform_item(item=v) for k, v in collection.values.items()}
    return make_collection(values=transformed)

We run a ton of these tasks and the host set up time adds up. We'd like to be able to do:

@dynamic
def transform_collection(collection: MyCollection) -> MyCollection:
    transformed = {k: transform_item(item=v) for k, v in collection.values.items()}
    return MyCollection(transformed)

Describe alternatives you've considered

Maybe this would be possible with a custom transformer, but I'd rather this be part of flyte core. We could also achieve something like this with @eager, but then we need to keep the dynamic host around for the duration of the contained transform_item operations and those can take quite a long time.

Propose: Link/Inline OR Additional context

No response

Are you sure this issue hasn't been raised already?

  • [x] Yes

Have you read the Code of Conduct?

  • [x] Yes

madwed-stripe avatar Feb 26 '25 16:02 madwed-stripe

Thank you for opening your first issue here! 🛠

welcome[bot] avatar Feb 26 '25 16:02 welcome[bot]

A workaround is to start with an empty dict, iterate through the tasks, and add the results to the dict in the dynamic workflow:

from dataclasses import dataclass
from flytekit import task, dynamic, workflow


@dataclass
class MyCollection:
    stuff: dict[str, float]


@task
def transform_item(item: float) -> float:
    return 2.0 * item


@task
def make_collection(values: dict[str, float]) -> MyCollection:
    return MyCollection(values)


@dynamic
def transform_collection(collection: MyCollection) -> MyCollection:
    transformed = {}
    for k, v in collection.stuff.items():
        transformed[k] = transform_item(item=v)
    return make_collection(values=transformed)


@workflow
def wf(collection: MyCollection = MyCollection(stuff={"A": 1.2, "B": 3.4, "C": -1.4})) -> MyCollection:
    return transform_collection(collection=collection)

thomasjpfan avatar Feb 27 '25 18:02 thomasjpfan

@thomasjpfan Can you explain the workaround a bit more? That is functionally equivalent to my example and doesn't solve the problem of needing the make_collection task

madwed-stripe avatar Mar 05 '25 00:03 madwed-stripe

Sorry, I misunderstood the issue. My workaround does not resolve your issue with having to use another task to construct the dataclass.

thomasjpfan avatar Mar 05 '25 01:03 thomasjpfan

This would be a pretty cool feature, we'd need to figure out how to wrap instantiation of dataclasses (and potentially other types, but could begin with just that) that involve promises.

eapolinario avatar Mar 12 '25 18:03 eapolinario

@madwed-stripe Do you currently just add a task at the end of the dynamic workflow to create a dataclass from all the promises?

CtfChan avatar May 22 '25 03:05 CtfChan

Yes, that's right. We just add a task at the end of the dynamic today. It'd be awesome to do this even if it came with some caveats like __post_init__ logic won't run.

madwed-stripe avatar May 22 '25 20:05 madwed-stripe