flyte icon indicating copy to clipboard operation
flyte copied to clipboard

[BUG] Dynamic task name override does not work when multiple tasks are created

Open gpgn opened this issue 2 years ago • 8 comments

Describe the bug

Task names can be overridden with .with_overrides(node_name="foo"). This is useful for dynamic tasks especially, because they get cryptic identifiers such as d1-n-dn0 by default, making it difficult to see in the UI for what iteration a dynamically created task is running.

If multiple sub-tasks are created, the name override works initially for all tasks, but then fails for any tasks after the initial one.

Expected behavior

Overridden task names are visible throughout in the UI.

Additional context to reproduce

from flytekit import Resources, WorkflowFailurePolicy, dynamic, task, workflow


@task(
    limits=Resources(mem="500Mi"),
)
def some_task(
    val: str,
):
    print(val)


@dynamic(limits=Resources(mem="500Mi"))
def base_workflow():
    # set up task promises
    for val in ["one", "two", "three"]:
        task_some_task = some_task(
            val=val,
        ).with_overrides(node_name=f"{val}-some-task")
        task_some_other_task = some_task(
            val=val,
        ).with_overrides(node_name=f"{val}-some-other-task")

        task_some_task >> task_some_other_task


@workflow(    failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE,
)
def wf():
    base_workflow()

Screenshots

image

Are you sure this issue hasn't been raised already?

  • [X] Yes

Have you read the Code of Conduct?

  • [X] Yes

gpgn avatar Jul 12 '23 13:07 gpgn

Thank you for opening your first issue here! 🛠

welcome[bot] avatar Jul 12 '23 13:07 welcome[bot]

I was able to repro this with this example:

from flytekit import dynamic, task, workflow

@task
def some_task(val: str):
    print(val)

@dynamic
def dyn_1():
    # set up task promises
    for val in ["one", "two", "three", "four"]:
        task_some_task = some_task(
            val=val,
        ).with_overrides(node_name=f"overridden-{val}")
        task_some_other_task = some_task(
            val=val,
        ).with_overrides(node_name=f"overridden-again-{val}")

@workflow
def wf():
    dyn_1()

I instrumented flytekit to emit the dynamic job spec here and verified that the node ids have the correct (overridden) names:

<FlyteLiteral nodes { id: "overridden-one" metadata { name: "some_task" retries { } } inputs { var: "val" binding { scalar { primitive { string_value: "one" } } } } task_node { reference_id { resource_type: TASK project: "flytesnacks" domain: "development" name: "workflows.example_failure.some_task" version: "0efvzH6fsrXAryjKNykmQQ==" } overrides { } } }
              nodes { id: "overridden-again-one" metadata { name: "some_task" retries { } } inputs { var: "val" binding { scalar { primitive { string_value: "one" } } } } task_node { reference_id { resource_type: TASK project: "flytesnacks" domain: "development" name: "workflows.example_failure.some_task" version: "0efvzH6fsrXAryjKNykmQQ==" } overrides { } } }
              nodes { id: "overridden-two" metadata { name: "some_task" retries { } } inputs { var: "val" binding { scalar { primitive { string_value: "two" } } } } task_node { reference_id { resource_type: TASK project: "flytesnacks" domain: "development" name: "workflows.example_failure.some_task" version: "0efvzH6fsrXAryjKNykmQQ==" } overrides { } } }
              nodes { id: "overridden-again-two" metadata { name: "some_task" retries { } } inputs { var: "val" binding { scalar { primitive { string_value: "two" } } } } task_node { reference_id { resource_type: TASK project: "flytesnacks" domain: "development" name: "workflows.example_failure.some_task" version: "0efvzH6fsrXAryjKNykmQQ==" } overrides { } } }
              nodes { id: "overridden-three" metadata { name: "some_task" retries { } } inputs { var: "val" binding { scalar { primitive { string_value: "three" } } } } task_node { reference_id { resource_type: TASK project: "flytesnacks" domain: "development" name: "workflows.example_failure.some_task" version: "0efvzH6fsrXAryjKNykmQQ==" } overrides { } } }
              nodes { id: "overridden-again-three" metadata { name: "some_task" retries { } } inputs { var: "val" binding { scalar { primitive { string_value: "three" } } } } task_node { reference_id { resource_type: TASK project: "flytesnacks" domain: "development" name: "workflows.example_failure.some_task" version: "0efvzH6fsrXAryjKNykmQQ==" } overrides { } } }
              nodes { id: "overridden-four" metadata { name: "some_task" retries { } } inputs { var: "val" binding { scalar { primitive { string_value: "four" } } } } task_node { reference_id { resource_type: TASK project: "flytesnacks" domain: "development" name: "workflows.example_failure.some_task" version: "0efvzH6fsrXAryjKNykmQQ==" } overrides { } } }
              nodes { id: "overridden-again-four" metadata { name: "some_task" retries { } } inputs { var: "val" binding { scalar { primitive { string_value: "four" } } } } task_node { reference_id { resource_type: TASK project: "flytesnacks" domain: "development" name: "workflows.example_failure.some_task" version: "0efvzH6fsrXAryjKNykmQQ==" } overrides { } } }
              min_successes: 8 tasks { id { resource_type: TASK project: "flytesnacks" domain: "development" name: "workflows.example_failure.some_task" version: "0efvzH6fsrXAryjKNykmQQ==" } type: "python-task" metadata { runtime { type: FLYTE_SDK version: "0.0.0+develop" flavor: "python" } retries { } } interface { inputs { variables { key: "val" value { type { simple: STRING } description: "val" } } } outputs { } } container { image: "localhost:30000/with-overrides-debug:v1" args: "pyflyte-fast-execute" args: "--additional-distribution" args: "s3://my-s3-bucket/flytesnacks/development/N4TPH3JNW5FMUZB7IWXVUOWDDQ======/script_mode.tar.gz" args: "--dest-dir" args: "/root" args: "--" args: "pyflyte-execute" args: "--inputs" args: "{{.input}}" args: "--output-prefix" args: "{{.outputPrefix}}" args: "--raw-output-data-prefix" args: "{{.rawOutputDataPrefix}}" args: "--checkpoint-path" args: "{{.checkpointOutputPrefix}}" args: "--prev-checkpoint" args: "{{.prevCheckpointPrefix}}" args: "--resolver" args: "flytekit.core.python_auto_container.default_task_resolver" args: "--" args: "task-module" args: "workflows.example_failure" args: "task-name" args: "some_task" resources { } } }>

Here's an execution in the development.uniondemo cluster: https://development.uniondemo.run/console/projects/flytesnacks/domains/development/executions/f0314db5485f0478e85b?duration=all. I can see that the node names are correct in the compiled workflow, but can't say the same in the actual node executions.

eapolinario avatar Jul 22 '23 00:07 eapolinario

@hamersaw , from the above investigation we can rule flytekit weirdness out. Can you take a look when you have some time? I could also take a look if you gave me some pointers on what to look.

eapolinario avatar Jul 24 '23 17:07 eapolinario

I think the issue here has nothing to do with dynamics / etc. We restrict node IDs to 20 characters maximum, if they get larger than that we hash down to 8 characters. When we run:

from flytekit import dynamic, task, workflow

@task
def some_task(val: str):
    print(val)

@dynamic
def dyn_1():
    # set up task promises
    for val in ["one", "two", "three", "four"]:
        task_some_task = some_task(
            val=val,
        ).with_overrides(node_name=f"overridden-{val}")
        task_some_other_task = some_task(
            val=val,
        ).with_overrides(node_name=f"o-again-{val}")

@workflow
def wf():
    dyn_1()

only one node exceeds the 20 characters. image

hamersaw avatar Jul 24 '23 18:07 hamersaw

This pruning of node ids only applies to dynamics, right?

In this example we only see the long node ids being shortened in the case of dynamics:

from flytekit import dynamic, task, workflow

@task
def t():
    ...

def invoke_tasks():
    t().with_overrides(node_name="012345678z")
    t().with_overrides(node_name="012345678z012345678z")
    t().with_overrides(node_name="012345678z012345678z012345678z")


@dynamic
def dyn_1():
    invoke_tasks()

@workflow
def sub_wf():
    invoke_tasks()

@workflow
def wf():
    invoke_tasks()
    dyn_1()
    sub_wf()

Here's how it looks like: Screenshot 2023-08-15 at 1 35 09 PM

Notice how the 20 character restriction only applies to nodes defined as part of the @dynamic.

eapolinario avatar Jul 31 '23 18:07 eapolinario

I think this is a UI bug - just grabbing the wrong field. When you expand one of the >20 length IDs in at the parent and sub_wf level in the pane is the shortened ID, but the node-level view shows the full value. image

hamersaw avatar Aug 16 '23 17:08 hamersaw

@FrankFlitton , flyteconsole is pulling the wrong node names in the case of nodes not spawn as part of a dynamic task. Notice in the above example how in the Nodes view we show two different names for the same node.

As an example, take a look at https://development.uniondemo.run/console/projects/flytesnacks/domains/development/executions/f47cbdb67e35b48298fd , if you expand the third node t you'll see that the node names between the Nodes view and the expanded tab do not match. The node name in the expanded tab is the correct node name.

eapolinario avatar Aug 16 '23 18:08 eapolinario

Hello 👋, this issue has been inactive for over 9 months. To help maintain a clean and focused backlog, we'll be marking this issue as stale and will engage on it to decide if it is still applicable. Thank you for your contribution and understanding! 🙏

github-actions[bot] avatar May 13 '24 00:05 github-actions[bot]