[Core feature] Support simple dataclass construction from promises in workflow/dynamic
Motivation: Why do you think this is important?
A common pain point for our systems is the need to add a create_dataclass task at the end of a dynamic workflow. It would be wonderful to support dataclass construction from promises in a dynamic (and in a workflow).
Goal: What should the final outcome look like, ideally?
Here's a simple example of what we need to do today:
@dataclass
class MyCollection:
values: dict[str, float]
@task
def transform_item(item: float) -> float:
return 1.0
@task
def make_collection(values: dict[str, float]) -> MyCollection:
return MyCollection(values)
@dynamic
def transform_collection(collection: MyCollection) -> MyCollection:
transformed = {k: transform_item(item=v) for k, v in collection.values.items()}
return make_collection(values=transformed)
We run a ton of these tasks and the host set up time adds up. We'd like to be able to do:
@dynamic
def transform_collection(collection: MyCollection) -> MyCollection:
transformed = {k: transform_item(item=v) for k, v in collection.values.items()}
return MyCollection(transformed)
Describe alternatives you've considered
Maybe this would be possible with a custom transformer, but I'd rather this be part of flyte core. We could also achieve something like this with @eager, but then we need to keep the dynamic host around for the duration of the contained transform_item operations and those can take quite a long time.
Propose: Link/Inline OR Additional context
No response
Are you sure this issue hasn't been raised already?
- [x] Yes
Have you read the Code of Conduct?
- [x] Yes
Thank you for opening your first issue here! 🛠
A workaround is to start with an empty dict, iterate through the tasks, and add the results to the dict in the dynamic workflow:
from dataclasses import dataclass
from flytekit import task, dynamic, workflow
@dataclass
class MyCollection:
stuff: dict[str, float]
@task
def transform_item(item: float) -> float:
return 2.0 * item
@task
def make_collection(values: dict[str, float]) -> MyCollection:
return MyCollection(values)
@dynamic
def transform_collection(collection: MyCollection) -> MyCollection:
transformed = {}
for k, v in collection.stuff.items():
transformed[k] = transform_item(item=v)
return make_collection(values=transformed)
@workflow
def wf(collection: MyCollection = MyCollection(stuff={"A": 1.2, "B": 3.4, "C": -1.4})) -> MyCollection:
return transform_collection(collection=collection)
@thomasjpfan Can you explain the workaround a bit more? That is functionally equivalent to my example and doesn't solve the problem of needing the make_collection task
Sorry, I misunderstood the issue. My workaround does not resolve your issue with having to use another task to construct the dataclass.
This would be a pretty cool feature, we'd need to figure out how to wrap instantiation of dataclasses (and potentially other types, but could begin with just that) that involve promises.
@madwed-stripe Do you currently just add a task at the end of the dynamic workflow to create a dataclass from all the promises?
Yes, that's right. We just add a task at the end of the dynamic today. It'd be awesome to do this even if it came with some caveats like __post_init__ logic won't run.