prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Argument of type "PrefectFuture[int, Sync]" cannot be assigned to parameter "i" of type "int" in function "__call__"

Open tekumara opened this issue 2 years ago • 5 comments

Description

pyright error:

basic_flow.py:12:18 - error: Argument of type "PrefectFuture[int, Sync]" cannot be assigned to parameter "i" of type "int" in function "__call__"
    "PrefectFuture[int, Sync]" is incompatible with "int" (reportGeneralTypeIssues)

mypy error:

basic_flow.py:12: error: No overload variant of "__call__" of "Task" matches argument type "PrefectFuture[None, Literal[False]]"
basic_flow.py:12: note: Possible overload variants:
basic_flow.py:12: note:     def __call__(self, i: int) -> PrefectFuture[None, Literal[False]]
basic_flow.py:12: note:     def __call__(self, i: int) -> Awaitable[PrefectFuture[<nothing>, Literal[True]]]

Reproduction / Example

basic_flow.py:

from prefect import flow, get_run_logger, task
from prefect.futures import PrefectFuture
from prefect.utilities.asyncio import Sync


@flow
def add_flow(i: int) -> PrefectFuture[int, Sync]:
    # result is PrefectFuture[int, Sync]
    result = add_one(i)

    # passing the result future will resolve to its int value
    print_result(result)    # <--- line 12: pyright/mypy errors here
    return result

@task
def add_one(i: int) -> int:
    return i + 1

@task
def print_result(i: int) -> None:
    logger = get_run_logger()
    logger.info(f"print_result: {i=}")

if __name__ == "__main__":
    add_flow(1)

prefect 2.0b8

tekumara avatar Jul 10 '22 06:07 tekumara

We automatically coerce futures into values for you, but that kind of magic isn't supported by type checkers. I'm not sure this is something we can reasonably address. To support this, we'd need to update the input signature of your task to make each argument a Union[Future[T], T]. I do not believe this is possible with Python parameter specs yet. We may be able to write a mypy plugin to make this feasible, but pyright does not support plugins.

zanieb avatar Jul 10 '22 16:07 zanieb

Hello I'm having the same issue, interested in a workaround

lucienfregosibodyguard avatar Aug 10 '22 16:08 lucienfregosibodyguard

I'm certain this will require a mypy plugin. If someone is interested in taking a swing, I will review the pull request.

zanieb avatar Aug 10 '22 16:08 zanieb

Updated example using submit in prefect>=2.0:

from prefect import flow, get_run_logger, task
from prefect.futures import PrefectFuture
from prefect.utilities.asyncutils import Sync


@flow
def add_flow(i: int) -> PrefectFuture[int, Sync]:
    # result is PrefectFuture[int, Sync]
    result = add_one.submit(i)

    # passing the result future will resolve to its int value
    print_result.submit(result)    # <--- line 12: pyright/mypy errors here
    return result

@task
def add_one(i: int) -> int:
    return i + 1

@task
def print_result(i: int) -> None:
    logger = get_run_logger()
    logger.info(f"print_result: {i=}")

if __name__ == "__main__":
    add_flow(1)

tekumara avatar Sep 22 '22 10:09 tekumara

The comment above still stands:

To support this, we'd need to update the input signature of your task to make each argument a Union[Future[T], T]. I do not believe this is possible with Python parameter specs yet. We may be able to write a mypy plugin to make this feasible, but pyright does not support plugins.

zanieb avatar Sep 22 '22 13:09 zanieb

Hi! Here is my workaround:

from typing import TypeVar, cast

from prefect.futures import PrefectFuture
from prefect.utilities.asyncutils import A


T = TypeVar("T")

def cast_future(val: PrefectFuture[T, A]) -> T:
    return cast(T, val)

Then the original code can be rewritten like this:

from prefect import flow, get_run_logger, task
from prefect.futures import PrefectFuture
from prefect.utilities.asyncutils import Sync


@flow
def add_flow(i: int) -> PrefectFuture[int, Sync]:
    # result is PrefectFuture[int, Sync]
    result = add_one.submit(i)

    # passing the result future will resolve to its int value
    print_result.submit(cast_future(result))  # cast to int
    return result

@task
def add_one(i: int) -> int:
    return i + 1

@task
def print_result(i: int) -> None:
    logger = get_run_logger()
    logger.info(f"print_result: {i=}")

if __name__ == "__main__":
    add_flow(1)

Why it's better than just typing.cast:

  1. With typing.cast, you can cast to any type. It increases the probability of making a mistake.
  2. With cast_future, the type is detected automatically.

Can you consider adding cast_future to prefect.futures? Or you can add a cast method to PrefectFuture class.

e10v avatar Oct 27 '23 06:10 e10v

Here is another dirty hack. But it's useful only if you:

  1. Always submit tasks.
  2. Never use PrefectFuture methods (like result). Although you can call some_task.submit explicitly and use PrefectFuture API.
from __future__ import annotations

import functools
from typing import TYPE_CHECKING, cast

import prefect


if TYPE_CHECKING:
    from collections.abc import Callable
    from typing import Any, ParamSpec, TypeVar

    P = ParamSpec("P")
    R = TypeVar("R")


class _Task(prefect.Task):
    """Task with submit on call."""
    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        return self.submit(*args, **kwargs)

Task = cast(type[prefect.Task], _Task)


def _cast_fn(typ: Callable[P, R], val: Callable) -> Callable[P, R]:  # noqa
    return val  # type: ignore

def _task(__fn: Callable | None = None, **kwargs: Any) -> Any:
    """Creates a Task with submit on call."""
    if __fn:
        return Task(fn=__fn, **kwargs)

    return functools.partial(_task, **kwargs)

task = _cast_fn(prefect.task, _task)

Then the original code can be rewritten like this:

from prefect import flow, get_run_logger

from <some_local_path> import task


@flow
def add_flow(i: int) -> int:
    # Note that there is no submit.
    result = add_one(i)

    print_result(result)
    return result

@task
def add_one(i: int) -> int:
    return i + 1

@task
def print_result(i: int) -> None:
    logger = get_run_logger()
    logger.info(f"print_result: {i=}")

if __name__ == "__main__":
    add_flow(1)

The code is much cleaner in this case. The drawback is that it's a bit misleading and requires some conventions among engineers.

e10v avatar Nov 26 '23 06:11 e10v

@zanieb's comment still stands here.

Though, having a future hint as Union[PrefectFuture[T], T]] I fear would just be pushing around where you see typing errors. In Prefect 3 we explicitly ask folks to resolve futures, so the following code type hints under, say, pylance strict:


@flow
def add_flow(i: int) -> PrefectFuture[int]:
    future = add_one.submit(i)
    print_result(future.result()) 
    return future

This issue has been stale for bit and there are some good workarounds. Short of a dedicated mypy plugin I think we have a lot of good workarounds. Going to close for now but don't hesitate to reopen if these workarounds don't do it for you.

aaazzam avatar Jul 11 '24 14:07 aaazzam