Set upstream node for branch node
Signed-off-by: Kevin Su [email protected]
TL;DR
We didn't set the upstream node for the branch node. Therefore, the DAG of the following example is wrong,
from flytekit import task, workflow, conditional
@task
def square(n: float) -> float:
return n * n
@task
def double(n: float) -> float:
return 2 * n
@task
def t1() -> float:
return 3.0
@workflow
def wf(my_input: float) -> float:
val = t1()
return (
conditional("fractions")
.if_((my_input >= 0.1) & (my_input <= 1.0))
.then(double(n=val))
.else_()
.then(square(n=my_input))
)
Before:

After:

For more detail, check https://flyte-org.slack.com/archives/C03CY9S9MFE/p1651262585261169 and https://flyte-org.slack.com/archives/C03CY9S9MFE/p1651307245028089
Type
- [x] Bug Fix
- [ ] Feature
- [ ] Plugin
Are all requirements met?
- [x] Code completed
- [ ] Smoke tested
- [ ] Unit tests added
- [ ] Code documentation added
- [ ] Any pending items have an associated Issue
Complete description
How did you fix the bug, make the feature etc. Link to any design docs etc
Tracking Issue
https://github.com/flyteorg/flyte/issues/
Follow-up issue
NA
Codecov Report
Merging #986 (a1c59b9) into master (156e34f) will increase coverage by
0.00%. The diff coverage is100.00%.
@@ Coverage Diff @@
## master #986 +/- ##
=======================================
Coverage 86.28% 86.28%
=======================================
Files 252 252
Lines 24137 24146 +9
Branches 2747 2750 +3
=======================================
+ Hits 20826 20835 +9
Misses 2843 2843
Partials 468 468
| Impacted Files | Coverage Δ | |
|---|---|---|
| flytekit/core/promise.py | 75.20% <ø> (ø) |
|
| flytekit/core/condition.py | 89.41% <100.00%> (+0.16%) |
:arrow_up: |
| tests/flytekit/unit/core/test_conditions.py | 98.73% <100.00%> (+0.02%) |
:arrow_up: |
Continue to review full report at Codecov.
Legend - Click here to learn more
Δ = absolute <relative> (impact),ø = not affected,? = missing dataPowered by Codecov. Last update 156e34f...a1c59b9. Read the comment docs.
using your example, are these equivalent?
@workflow
def wf(my_input: float) -> float:
val = t1()
return (
conditional("fractions")
.if_((my_input >= 0.1) & (my_input <= 1.0))
.then(double(n=val))
.else_()
.then(square(n=my_input))
)
@workflow
def wf(my_input: float) -> float:
return (
conditional("fractions")
.if_((my_input >= 0.1) & (my_input <= 1.0))
.then(double(n=t1()))
.else_()
.then(square(n=my_input))
)
@kumare3 @pingsutw
yeah, but if IIRC, the second one doesn't work, right?
should they be equivalent?
cc @wild-endeavor
sorry, I think I was wrong. They are not quite equivalent
In the first one, we'll run t1() no matter the condition is true or false.
In the second one, we'll run t1() only when the condition is true. Therefore, we should make t1 become part of the branch node as well. We don't support it for now, right?
which means we can't have a workflow like
if -> t1 -> t2
start -> -> end
else -> t3
For now, we can only have exactly one task in if block and else block.
yes @pingsutw agreed. @kumare3 @enghabu what should we do about this?
I don't think the problem with the example above is the graph... it's in the execution path...
If you look at the generated DAG, it has an upstream dependency between n1-n0 (the first branch) to n0 which should cause that node to wait until data from n0 become available before proceeding.

Somewhere between this and the Node Handler this breaks....
Please let me know if you need further help investigating this.