flytekit
flytekit copied to clipboard
Local execution with compiled graph(#4080)
TL;DR
Support user-defined execution sequence in local pyfylte. Now subworkflow/tasks can be sequence by >>
operation within workflow. for example:
@workflow()
def wf():
a = print_a() # task: print ("a")
b = print_b() # task: print ("b")
b >> a
wf() # should output "b\na"
Type
- [ ] Bug Fix
- [x] Feature
- [ ] Plugin
Are all requirements met?
- [x] Code completed
- [ ] Smoke tested
- [x] Unit tests added
- [ ] Code documentation added
- [x] Any pending items have an associated Issue
Complete description
- Support
BranchNode
local execution by addingConditionalSection
in__init__
- Add
__call__
inBrachNode
to eval all its expression via kwargs. - Eval all the _lhs and _rhs of
ComparisonExpression
andConjunctionExpression
via DFS. - Support local execution on
Gate
--> Need check, I just use the first key in kwargs as output. - Implemented the Core algorithm.
a. Do topological sort on
workflow._nodes
, check if cycle exists. b. Follow the implementation inImperativeWorkflow
c. Supportdynamic
by compiled one more time. d. Recursive execution with BranchNode f. Modify dynamic local execution flow
Tracking Issue
https://github.com/flyteorg/flyte/issues/4080
Follow-up issue
NA
OR
https://github.com/flyteorg/flyte/issues/
Codecov Report
Attention: 119 lines
in your changes are missing coverage. Please review.
Comparison is base (
744c167
) 85.80% compared to head (602a55b
) 54.25%. Report is 44 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #1917 +/- ##
===========================================
- Coverage 85.80% 54.25% -31.55%
===========================================
Files 313 173 -140
Lines 23278 16937 -6341
Branches 3526 3502 -24
===========================================
- Hits 19973 9190 -10783
- Misses 2702 7332 +4630
+ Partials 603 415 -188
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
Actually, I run all the workflow testing file by replacing the execution function with newer one, execution_with_graph
. I pass almost all the tests except for following nine. I take a deep dig in following cases, and they can be summarized into three cases. (@pingsutw )
- Failing condition on BranchNode. --> also failed on remote. (It would not show the failing message now)
- Type checking. --> cannot check using node-based execution.
@workflow
def wf():
return 1
## should raise error
- Regex error. --> missing the line cannot be recovered by node-based execution.
@eapolinario @pingsutw @wild-endeavor can one of you please review?