flyte icon indicating copy to clipboard operation
flyte copied to clipboard

[BUG] map_task / ArrayNode workflows cannot be used with FlyteRemote

Open cosmicBboy opened this issue 1 month ago • 1 comments

Describe the bug

Doing remote.wait(ex) on an execution that contains an ArrayNode errors out:

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/remote/remote.py:1891: in wait
    execution = self.sync_execution(execution, sync_nodes=sync_nodes)
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/remote/remote.py:1986: in sync_execution
    node_execs[n.id.node_id] = self.sync_node_execution(n, node_mapping)  # noqa
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/remote/remote.py:2116: in sync_node_execution
    logger.error(f"NE {execution} undeterminable, {type(execution._node)}, {execution._node}")
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/models/common.py:54: in __str__
    return self.verbose_string()
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/models/common.py:71: in verbose_string
    return self.short_string()
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/models/common.py:63: in short_string
    literal_str = re.sub(r"\s+", " ", str(self.to_flyte_idl())).strip()
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/models/core/workflow.py:569: in to_flyte_idl
    array_node=self.array_node.to_flyte_idl() if self.array_node else None,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <[AttributeError("'int' object has no attribute 'to_flyte_idl'") raised in repr()] FlyteArrayNode object at 0x111393750>

    def to_flyte_idl(self) -> _core_workflow.ArrayNode:
        return _core_workflow.ArrayNode(
>           node=self._node.to_flyte_idl() if self._node is not None else None,
            parallelism=self._parallelism,
            min_successes=self._min_successes,
            min_success_ratio=self._min_success_ratio,
        )
E       AttributeError: 'int' object has no attribute 'to_flyte_idl'

../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/models/core/workflow.py:400: AttributeError

This should work.

Expected behavior

Users should be able to use FlyteRemote to fetch/sync/wait on executions containing ArrayNodes.

Additional context to reproduce

Workflow file:

from functools import partial
from flytekit import task, map_task, workflow


@task
def fn(x: int, y: int) -> int:
    return x + y


@workflow
def workflow_with_maptask(data: list[int], y: int) -> list[int]:
    partial_fn = partial(fn, y=y)
    return map_task(partial_fn)(x=data)

pytest test to reproduce:

import re
from datetime import timedelta
from subprocess import run
from time import sleep

from flytekit import WorkflowExecutionPhase
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config


def test_workflow_with_maptask(workflows_dir):
    """Check simple hello world example.

    1. Run map_tasks.py
    2. Checks output is a list of [x + y, ...]
    """
    result = run(
        [
            "unionai",
            "--config",
            "<path/to/config.yaml>",
            "run",
            "--remote",
            "map_tasks.py",
            "workflow_with_maptask",
            "--data",
            "[1, 2, 3, 4, 5]",
            "--y",
            "1",
        ],
        cwd=workflows_dir, text=True, check=True, capture_output=True
    )
    match = re.search(r"executions/(\w+)", result.stdout)

    execution_id = match.group(1)
    remote = FlyteRemote(
        config=Config.for_endpoint("dns:///demo.hosted.unionai.cloud"),
        default_project="flytesnacks",
        default_domain="development",
    )

    ex = remote.fetch_execution(name=execution_id)
    ex = remote.wait(ex, poll_interval=timedelta(seconds=1))  # 👈 error happens here

    assert ex.closure.phase == WorkflowExecutionPhase.SUCCEEDED
    assert ex.outputs["o0"] == [2, 3, 4, 5, 6]

Screenshots

No response

Are you sure this issue hasn't been raised already?

  • [X] Yes

Have you read the Code of Conduct?

  • [X] Yes

cosmicBboy avatar May 13 '24 21:05 cosmicBboy