flyte
flyte copied to clipboard
[Core feature] Allow caching of launchplans/subworkflows
Motivation: Why do you think this is important?
Currently caching happens at the task level, this means from entire workflows which can be cached incur the cost of running the entire DAG.
Goal: What should the final outcome look like, ideally?
Caching is configurable at the workflow/launch plan level. Also, might make sense to supply a with_overrides
flag to allow modifying cache behavior.
Describe alternatives you've considered
NA
Propose: Link/Inline OR Additional context
No response
Are you sure this issue hasn't been raised already?
- [X] Yes
Have you read the Code of Conduct?
- [X] Yes
This is something we've been thinking about for a while and do want to do. Thanks for the issue.
Agree this would be useful. Is this equivalent to caching every task within the workflow (assuming tasks are deterministic)?
yes makes sense to me
@dylanwilder / @geodavic do you think this is something that should be done implicitly or explicitly? For example, if all of the top-level tasks in a subworkflow are cacheable then we could infer that the subworkflow is cacheable. The explicit approach would require an API update (that will have to be discussed) as to how subworkflows / launchplans are marked as cacheable when used by another workflow.
@hamersaw The only concern with the implicit approach would be any other side effects from running the dag might be skipped (eg flyte events emitted, or execution metadata in the api) potentially breaking consumers of these apis. I see there is ongoing work to remove cache hit overhead, does that work supersede this?
Sure, I think explicit makes sure there is not unintended behavior.
I see there is ongoing work to remove cache hit overhead, does that work supersede this?
Yeah, thanks for looking! Unfortunately, I had to take a break from it, but will revisit shortly. That work involves moving the cache lookup from the task execution level to the node execution level. So not necessarily supersede, but definitely precede. Once that is complete, adding support for caching subworkflows and launchplans is more or less trivial.
Hello 👋, This issue has been inactive for over 9 months. To help maintain a clean and focused backlog, we'll be marking this issue as stale and will close the issue if we detect no activity in the next 7 days. Thank you for your contribution and understanding! 🙏
Hello 👋, This issue has been inactive for over 9 months and hasn't received any updates since it was marked as stale. We'll be closing this issue for now, but if you believe this issue is still relevant, please feel free to reopen it. Thank you for your contribution and understanding! 🙏
For a little context on the lift for this work. In the fastcache PR we moved cache lookups within Flyte from the task-level to the node-level. This decision was multi-faceted; (1) it sped up cache lookups because tasks did not need to undergo phase transitions, rather nodes went directly to SUCCEEDED
and (2) enabling caching of other Flyte node types.
The sequence to get this working for subworkflows / launchplans requires touching a few Flyte components and can be layed out as such:
-
Allow subworkflows / launchplans to be labelled as "cacheable": Currently, tasks use the discoverable field which is set in the task decorator (ie.
@task(cache=True)
) and ultimately populates theTaskMetadata
struct in the task protobuf definition. The mechanism to determine if a subworkflow / launchplan is "cacheable" will need to be agreed upon and the correct fields to support it added to FlyteIDL and subsequently populated by flytekit. -
Implement
CacheableNodeHandler
interface forworkflowNodeHandler
: Within Flyte execution each node is processed by a specific node handler, the workflowNodeHandler is responsible for subworkflows and launchplans. As part of thefastcache
work we added an interface, namely CacheableNodeHandler, which determines (1) if a node is cacheable or not, so given a field in the FlyteIDL defintion this can be a simple field lookup, and (2) what is the uniqueCacheKey
for this instance of the node, this value uniquely identifies a cached item including the entity version, etc. A sample implementation for theTaskHandler
(for executing Flyte tasks) can be found here. -
It would be ideal to clean up FlyteIDL cache status storage: Currently, cache status is stored as part of the task execution metadata and ultimately populates this information in the protobuf definition. Before
fastcache
(and this work) only tasks were cacheable, so it made sense to store this information at the task-level. However, while it will still work (ie. correctly display in UI, etc) it doesn't make much sense for a subworkflow / launchplan execution to populate the task execution info. This field should probably instead be moved to the node execution information.