Add support cache for dynamic spec
Tracking issue
Closes #5543
Why are the changes needed?
When executing a dynamic workflow in the Flyte backend, compiling the future.pb file before sub-node execution takes some time. If an out-of-memory (OOM) error occurs during sub-node execution, re-running the dynamic workflow requires recompiling the future.pb file, consuming additional time and computational resources. To address this, we propose caching the compiled future.pb file.
The future cache mechanism in this PR stores the compiled file as artifact data in Flyte storage via the datacatalog. The cache key, TagName, is determined by the hash of the input values. The diagrams below illustrate the data flow in the backend system:
What changes were proposed in this pull request?
Please refer to bellow diagram for understanding the procedure of how future read/write:
key changes: 1.Added Mode to TaskTemplate Metadata in flyteidl to indicate whether a task is a DynamicTask. 2.Implemented logic in flytepropeller to send cache read/write requests to datacatalog. 3.Added cache read/write functionality to the storage layer in datacatalog.
How was this patch tested?
To test how much time it saved when future file cache hit, we modify this block of code to count the time spend to run taskNodeHandler.Handle for test purpose.
startTime := time.Now()
if !isDynamic || cacheStatus == nil || cacheStatus.GetCacheStatus() != core.CatalogCacheStatus_CACHE_HIT {
var err error
taskTrns, err := d.TaskNodeHandler.Handle(ctx, nCtx)
trns = &taskTrns
if err != nil {
logger.Debug(ctx, "Failed to compile dynamic workflow")
return *trns, prevState, err
}
}
if isDynamic {
duration := time.Since(startTime)
logger.Infof(ctx, "Dynamic node compilation took %v", duration)
}
To mimic OOM, we will throw panic in dynamicNodeHandler. So the Dynamic workflow will be aborted after future file compiled
func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, nCtx interfaces.NodeExecutionContext, prevState handler.DynamicNodeState) (handler.Transition, handler.DynamicNodeState, error) {
panic("throw panic here to mimic OOM")
# existing code…
then we run bellow test dynamic workflow for the first time
import typing
from flytekit import dynamic, task, workflow
@task
def return_index(character: str) -> int:
"""
Computes the character index (which needs to fit into the 26 characters list)
"""
if character.islower():
return ord(character) - ord("a")
else:
return ord(character) - ord("A")
@task
def update_list(freq_list: typing.List[int], list_index: int) -> typing.List[int]:
"""
Notes the frequency of characters
"""
freq_list[list_index] += 1
return freq_list
@task
def derive_count(freq1: typing.List[int], freq2: typing.List[int]) -> int:
"""
Derives the number of common characters
"""
count = 0
for i in range(26):
count += min(freq1[i], freq2[i])
return count
@dynamic(cache_version="4", cache=True)
def count_characters(s1: str, s2: str) -> int:
"""
Calls the required tasks and returns the final result
"""
# s1 and s2 are accessible
# initiliaze an empty list consisting of 26 empty slots corresponding to every alphabet (lower and upper case)
freq1 = [0] * 26
freq2 = [0] * 26
# looping through the string s1
for i in range(len(s1)):
# index and freq1 are not accesible as they are promises
index = return_index(character=s1[i])
freq1 = update_list(freq_list=freq1, list_index=index)
# looping through the string s2
for i in range(len(s2)):
# index and freq2 are not accesible as they are promises
index = return_index(character=s2[i])
freq2 = update_list(freq_list=freq2, list_index=index)
# counting the common characters
return derive_count(freq1=freq1, freq2=freq2)
@workflow
def wf(s1: str, s2: str) -> int:
"""
Calls the dynamic workflow and returns the result
"""
# sending two strings to the workflow
return count_characters(s1=s1, s2=s2)
We use the command pyflyte run --remote dynamic.py wf --s1="dynamic" --s2="test"
The first run took around 17 ms to compile future file
Then we remove the panic throwing and re-execute the same workflow with the same input. The future cache hit and it only took 83ns to go through the taskNodeHandler.Handle code block, which means the taskNodeHandler.Handle is skipped.
Moreover, unit tests has been added to make sure cache behaviors.
Labels
Please add one or more of the following labels to categorize your PR:
- added: For new features.
This is important to improve the readability of release notes.
Check all the applicable boxes
- [X] All new and existing tests passed.
- [X] All commits are signed-off.
Related PRs
flytekit PR
Code Review Agent Run Status
- Limitations and other issues: ❌ Failure - Bito Code Review Agent didn't review this pull request automatically because it exceeded the size limit. No action is needed if you didn't intend for the agent to review it. Otherwise, you can initiate the review by typing
/reviewin a comment below.
Codecov Report
Attention: Patch coverage is 69.55530% with 267 lines in your changes missing coverage. Please review.
Project coverage is 58.59%. Comparing base (
8f02ac6) to head (5e8e14d).
Additional details and impacted files
@@ Coverage Diff @@
## master #6372 +/- ##
==========================================
+ Coverage 58.48% 58.59% +0.10%
==========================================
Files 940 944 +4
Lines 71584 72442 +858
==========================================
+ Hits 41867 42444 +577
- Misses 26534 26750 +216
- Partials 3183 3248 +65
| Flag | Coverage Δ | |
|---|---|---|
| unittests-datacatalog | 59.58% <63.35%> (+0.55%) |
:arrow_up: |
| unittests-flyteadmin | 56.20% <ø> (-0.06%) |
:arrow_down: |
| unittests-flytecopilot | 30.99% <ø> (ø) |
|
| unittests-flytectl | 64.72% <ø> (ø) |
|
| unittests-flyteidl | 76.12% <ø> (ø) |
|
| unittests-flyteplugins | 60.95% <ø> (ø) |
|
| unittests-flytepropeller | 55.24% <73.15%> (+0.46%) |
:arrow_up: |
| unittests-flytestdlib | 64.02% <ø> (-0.02%) |
:arrow_down: |
Flags with carried forward coverage won't be shown. Click here to find out more.
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
:rocket: New features to boost your workflow:
- :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
Code Review Agent Run Status
- Limitations and other issues: ❌ Failure - Bito Code Review Agent didn't review this pull request automatically because it exceeded the size limit. No action is needed if you didn't intend for the agent to review it. Otherwise, you can initiate the review by typing
/reviewin a comment below.
Code Review Agent Run Status
- Limitations and other issues: ❌ Failure - Bito Code Review Agent didn't review this pull request automatically because it exceeded the size limit. No action is needed if you didn't intend for the agent to review it. Otherwise, you can initiate the review by typing
/reviewin a comment below.
Code Review Agent Run Status
- Limitations and other issues: ❌ Failure - Bito Code Review Agent didn't review this pull request automatically because it exceeded the size limit. No action is needed if you didn't intend for the agent to review it. Otherwise, you can initiate the review by typing
/reviewin a comment below.
This an interesting change @popojk. I have been working in this area lately and I'm trying to fully understand where this provides values.
When you say OOM happens during sub node execution are you referring to OOM in Propeller or OOM in the task?
I think during normal operation compiling the workflow only happens once so I'm skeptical if the change here is worth it.
This an interesting change @popojk. I have been working in this area lately and I'm trying to fully understand where this provides values.
When you say OOM happens during sub node execution are you referring to OOM in Propeller or OOM in the task?
I think during normal operation compiling the workflow only happens once so I'm skeptical if the change here is worth it.
Hi @Sovietaced . This PR is aiming to add a cache for future.pb that compiled by dynamic workflow. As far as I know, when a task in dynamic workflow OOM during execution, propeller will re-compile future.pb while re-executing the same dynamic workflow, leading to duplicated computation. Please let me know if you have any other suggestions. 😎