flytekit icon indicating copy to clipboard operation
flytekit copied to clipboard

Set upstream node for branch node

Open pingsutw opened this issue 3 years ago • 7 comments

Signed-off-by: Kevin Su [email protected]

TL;DR

We didn't set the upstream node for the branch node. Therefore, the DAG of the following example is wrong,

from flytekit import task, workflow, conditional


@task
def square(n: float) -> float:
    return n * n


@task
def double(n: float) -> float:
    return 2 * n


@task
def t1() -> float:
    return 3.0


@workflow
def wf(my_input: float) -> float:                                                                            
    val = t1()
    return (
        conditional("fractions")
            .if_((my_input >= 0.1) & (my_input <= 1.0))
            .then(double(n=val))
            .else_()
            .then(square(n=my_input))
    )

Before: image

After: image

For more detail, check https://flyte-org.slack.com/archives/C03CY9S9MFE/p1651262585261169 and https://flyte-org.slack.com/archives/C03CY9S9MFE/p1651307245028089

Type

  • [x] Bug Fix
  • [ ] Feature
  • [ ] Plugin

Are all requirements met?

  • [x] Code completed
  • [ ] Smoke tested
  • [ ] Unit tests added
  • [ ] Code documentation added
  • [ ] Any pending items have an associated Issue

Complete description

How did you fix the bug, make the feature etc. Link to any design docs etc

Tracking Issue

https://github.com/flyteorg/flyte/issues/

Follow-up issue

NA

pingsutw avatar May 03 '22 17:05 pingsutw

Codecov Report

Merging #986 (a1c59b9) into master (156e34f) will increase coverage by 0.00%. The diff coverage is 100.00%.

@@           Coverage Diff           @@
##           master     #986   +/-   ##
=======================================
  Coverage   86.28%   86.28%           
=======================================
  Files         252      252           
  Lines       24137    24146    +9     
  Branches     2747     2750    +3     
=======================================
+ Hits        20826    20835    +9     
  Misses       2843     2843           
  Partials      468      468           
Impacted Files Coverage Δ
flytekit/core/promise.py 75.20% <ø> (ø)
flytekit/core/condition.py 89.41% <100.00%> (+0.16%) :arrow_up:
tests/flytekit/unit/core/test_conditions.py 98.73% <100.00%> (+0.02%) :arrow_up:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update 156e34f...a1c59b9. Read the comment docs.

codecov[bot] avatar May 03 '22 17:05 codecov[bot]

using your example, are these equivalent?

@workflow
def wf(my_input: float) -> float:                                                                            
    val = t1()
    return (
        conditional("fractions")
            .if_((my_input >= 0.1) & (my_input <= 1.0))
            .then(double(n=val))
            .else_()
            .then(square(n=my_input))
    )
@workflow
def wf(my_input: float) -> float:                                                                            
    return (
        conditional("fractions")
            .if_((my_input >= 0.1) & (my_input <= 1.0))
            .then(double(n=t1()))
            .else_()
            .then(square(n=my_input))
    )

@kumare3 @pingsutw

wild-endeavor avatar May 12 '22 18:05 wild-endeavor

yeah, but if IIRC, the second one doesn't work, right?

pingsutw avatar May 12 '22 18:05 pingsutw

should they be equivalent?

wild-endeavor avatar May 12 '22 19:05 wild-endeavor

cc @wild-endeavor sorry, I think I was wrong. They are not quite equivalent In the first one, we'll run t1() no matter the condition is true or false. In the second one, we'll run t1() only when the condition is true. Therefore, we should make t1 become part of the branch node as well. We don't support it for now, right?

which means we can't have a workflow like

                if  ->  t1 -> t2 
start ->                               -> end
                 else  -> t3

For now, we can only have exactly one task in if block and else block.

pingsutw avatar May 13 '22 09:05 pingsutw

yes @pingsutw agreed. @kumare3 @enghabu what should we do about this?

wild-endeavor avatar May 22 '22 08:05 wild-endeavor

I don't think the problem with the example above is the graph... it's in the execution path... If you look at the generated DAG, it has an upstream dependency between n1-n0 (the first branch) to n0 which should cause that node to wait until data from n0 become available before proceeding.

image

Somewhere between this and the Node Handler this breaks....

Please let me know if you need further help investigating this.

EngHabu avatar May 23 '22 20:05 EngHabu