prefect
prefect copied to clipboard
Argument of type "PrefectFuture[int, Sync]" cannot be assigned to parameter "i" of type "int" in function "__call__"
Description
pyright error:
basic_flow.py:12:18 - error: Argument of type "PrefectFuture[int, Sync]" cannot be assigned to parameter "i" of type "int" in function "__call__"
"PrefectFuture[int, Sync]" is incompatible with "int" (reportGeneralTypeIssues)
mypy error:
basic_flow.py:12: error: No overload variant of "__call__" of "Task" matches argument type "PrefectFuture[None, Literal[False]]"
basic_flow.py:12: note: Possible overload variants:
basic_flow.py:12: note: def __call__(self, i: int) -> PrefectFuture[None, Literal[False]]
basic_flow.py:12: note: def __call__(self, i: int) -> Awaitable[PrefectFuture[<nothing>, Literal[True]]]
Reproduction / Example
basic_flow.py:
from prefect import flow, get_run_logger, task
from prefect.futures import PrefectFuture
from prefect.utilities.asyncio import Sync
@flow
def add_flow(i: int) -> PrefectFuture[int, Sync]:
# result is PrefectFuture[int, Sync]
result = add_one(i)
# passing the result future will resolve to its int value
print_result(result) # <--- line 12: pyright/mypy errors here
return result
@task
def add_one(i: int) -> int:
return i + 1
@task
def print_result(i: int) -> None:
logger = get_run_logger()
logger.info(f"print_result: {i=}")
if __name__ == "__main__":
add_flow(1)
prefect 2.0b8
We automatically coerce futures into values for you, but that kind of magic isn't supported by type checkers. I'm not sure this is something we can reasonably address. To support this, we'd need to update the input signature of your task to make each argument a Union[Future[T], T]
. I do not believe this is possible with Python parameter specs yet. We may be able to write a mypy plugin to make this feasible, but pyright does not support plugins.
Hello I'm having the same issue, interested in a workaround
I'm certain this will require a mypy
plugin. If someone is interested in taking a swing, I will review the pull request.
Updated example using submit
in prefect>=2.0
:
from prefect import flow, get_run_logger, task
from prefect.futures import PrefectFuture
from prefect.utilities.asyncutils import Sync
@flow
def add_flow(i: int) -> PrefectFuture[int, Sync]:
# result is PrefectFuture[int, Sync]
result = add_one.submit(i)
# passing the result future will resolve to its int value
print_result.submit(result) # <--- line 12: pyright/mypy errors here
return result
@task
def add_one(i: int) -> int:
return i + 1
@task
def print_result(i: int) -> None:
logger = get_run_logger()
logger.info(f"print_result: {i=}")
if __name__ == "__main__":
add_flow(1)
The comment above still stands:
To support this, we'd need to update the input signature of your task to make each argument a Union[Future[T], T]. I do not believe this is possible with Python parameter specs yet. We may be able to write a mypy plugin to make this feasible, but pyright does not support plugins.
Hi! Here is my workaround:
from typing import TypeVar, cast
from prefect.futures import PrefectFuture
from prefect.utilities.asyncutils import A
T = TypeVar("T")
def cast_future(val: PrefectFuture[T, A]) -> T:
return cast(T, val)
Then the original code can be rewritten like this:
from prefect import flow, get_run_logger, task
from prefect.futures import PrefectFuture
from prefect.utilities.asyncutils import Sync
@flow
def add_flow(i: int) -> PrefectFuture[int, Sync]:
# result is PrefectFuture[int, Sync]
result = add_one.submit(i)
# passing the result future will resolve to its int value
print_result.submit(cast_future(result)) # cast to int
return result
@task
def add_one(i: int) -> int:
return i + 1
@task
def print_result(i: int) -> None:
logger = get_run_logger()
logger.info(f"print_result: {i=}")
if __name__ == "__main__":
add_flow(1)
Why it's better than just typing.cast
:
- With
typing.cast
, you can cast to any type. It increases the probability of making a mistake. - With
cast_future
, the type is detected automatically.
Can you consider adding cast_future
to prefect.futures
? Or you can add a cast
method to PrefectFuture
class.
Here is another dirty hack. But it's useful only if you:
- Always
submit
tasks. - Never use
PrefectFuture
methods (likeresult
). Although you can callsome_task.submit
explicitly and usePrefectFuture
API.
from __future__ import annotations
import functools
from typing import TYPE_CHECKING, cast
import prefect
if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any, ParamSpec, TypeVar
P = ParamSpec("P")
R = TypeVar("R")
class _Task(prefect.Task):
"""Task with submit on call."""
def __call__(self, *args: Any, **kwargs: Any) -> Any:
return self.submit(*args, **kwargs)
Task = cast(type[prefect.Task], _Task)
def _cast_fn(typ: Callable[P, R], val: Callable) -> Callable[P, R]: # noqa
return val # type: ignore
def _task(__fn: Callable | None = None, **kwargs: Any) -> Any:
"""Creates a Task with submit on call."""
if __fn:
return Task(fn=__fn, **kwargs)
return functools.partial(_task, **kwargs)
task = _cast_fn(prefect.task, _task)
Then the original code can be rewritten like this:
from prefect import flow, get_run_logger
from <some_local_path> import task
@flow
def add_flow(i: int) -> int:
# Note that there is no submit.
result = add_one(i)
print_result(result)
return result
@task
def add_one(i: int) -> int:
return i + 1
@task
def print_result(i: int) -> None:
logger = get_run_logger()
logger.info(f"print_result: {i=}")
if __name__ == "__main__":
add_flow(1)
The code is much cleaner in this case. The drawback is that it's a bit misleading and requires some conventions among engineers.
@zanieb's comment still stands here.
Though, having a future hint as Union[PrefectFuture[T], T]]
I fear would just be pushing around where you see typing errors. In Prefect 3 we explicitly ask folks to resolve futures, so the following code type hints under, say, pylance strict:
@flow
def add_flow(i: int) -> PrefectFuture[int]:
future = add_one.submit(i)
print_result(future.result())
return future
This issue has been stale for bit and there are some good workarounds. Short of a dedicated mypy plugin I think we have a lot of good workarounds. Going to close for now but don't hesitate to reopen if these workarounds don't do it for you.