prefect icon indicating copy to clipboard operation
prefect copied to clipboard

RuntimeError: got Future <Future pending> attached to a different loop

Open NoamGit opened this issue 2 years ago • 5 comments

Description

Getting the following error when running an async flow in orion server or cloud

RuntimeError: Task <Task pending name='anyio.from_thread.BlockingPortal._call_func' coro=<BlockingPortal._call_func() running at /Users/noam.cohen/Library/Caches/pypoetry/virtualenvs/finance-lGLYToYy-py3.9/lib/python3.9/site-packages/anyio/from_thread.py:219> cb=[TaskGroup._spawn.<locals>.task_done() at /Users/noam.cohen/Library/Caches/pypoetry/virtualenvs/finance-lGLYToYy-py3.9/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:726]> got Future <Future pending> attached to a different loop

Reproduction

import asyncio

from dotenv import load_dotenv
from prefect import task, flow

@task
async def print_values(values):
    for value in values:
        await asyncio.sleep(1) # yield
        print(value, end=" ")

@task
def get_numbers():
    return [1, 2]

@flow(name='test-flow')
async def dummy_flow():
    numbers = get_numbers()
    await print_values(numbers)  # runs immediately
    coros = [print_values("abcd"), print_values("6789")]

    # asynchronously gather the tasks
    await asyncio.gather(*coros)



if __name__ == '__main__':
    load_dotenv()
    asyncio.run(dummy_flow())

Environment

prefect 2.08b

NoamGit avatar Jul 11 '22 18:07 NoamGit

@anna-geller so no anyio for now?

NoamGit avatar Aug 05 '22 12:08 NoamGit

Why did you tag me specifically for this @NoamGit? :) I'm no async expert. Could you try the same in the latest version and this example?

import asyncio

from prefect import task, flow


@task
async def print_values(values):
    for value in values:
        await asyncio.sleep(1)  # yield
        print(value, end=" ")


@flow
async def async_flow():
    await print_values([1, 2])  # runs immediately
    coros = [print_values("abcd"), print_values("6789")]

    # asynchronously gather the tasks
    await asyncio.gather(*coros)


if __name__ == "__main__":
    asyncio.run(async_flow())

This works fine for me:

08:24:18.214 | INFO    | prefect.engine - Created flow run 'maroon-nightingale' for flow 'async-flow'
08:24:18.215 | INFO    | Flow run 'maroon-nightingale' - Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
08:24:18.289 | INFO    | Flow run 'maroon-nightingale' - Created task run 'print_values-0bb9a2c3-0' for task 'print_values'
08:24:18.289 | INFO    | Flow run 'maroon-nightingale' - Executing 'print_values-0bb9a2c3-0' immediately...
1 2 08:24:20.320 | INFO    | Task run 'print_values-0bb9a2c3-0' - Finished in state Completed()
08:24:20.332 | INFO    | Flow run 'maroon-nightingale' - Created task run 'print_values-0bb9a2c3-1' for task 'print_values'
08:24:20.332 | INFO    | Flow run 'maroon-nightingale' - Executing 'print_values-0bb9a2c3-1' immediately...
08:24:20.339 | INFO    | Flow run 'maroon-nightingale' - Created task run 'print_values-0bb9a2c3-2' for task 'print_values'
08:24:20.339 | INFO    | Flow run 'maroon-nightingale' - Executing 'print_values-0bb9a2c3-2' immediately...
a 6 b 7 c 8 d 9 08:24:24.382 | INFO    | Task run 'print_values-0bb9a2c3-2' - Finished in state Completed()
08:24:24.392 | INFO    | Task run 'print_values-0bb9a2c3-1' - Finished in state Completed()
08:24:24.408 | INFO    | Flow run 'maroon-nightingale' - Finished in state Completed('All states completed.')

anna-geller avatar Aug 05 '22 12:08 anna-geller

And your example works if you turn the task to async:

import asyncio

from prefect import task, flow


@task
async def print_values(values):
    for value in values:
        await asyncio.sleep(1)  # yield
        print(value, end=" ")


@task
async def get_numbers():
    return [1, 2]


@flow(name="test-flow")
async def dummy_flow():
    numbers = await get_numbers()
    await print_values(numbers)  # runs immediately
    coros = [print_values("abcd"), print_values("6789")]

    # asynchronously gather the tasks
    await asyncio.gather(*coros)


if __name__ == "__main__":
    asyncio.run(dummy_flow())

anna-geller avatar Aug 05 '22 12:08 anna-geller

I tagged you because you asked me to open this issue in a new ticket :). I know you can move everything to async and await everything, but I thought that the idea is to allow execution of regular and coroutine tasks (see Jacob's answer https://github.com/PrefectHQ/prefect/issues/5853#issuecomment-1147655272). Isn't this specified in the requirements for this feature?

NoamGit avatar Aug 05 '22 13:08 NoamGit

Ahh gotcha, next time if you could link to that discussion, it would help, I can't remember all that 😄

Could you use one of the solutions I suggested, and if that works close the issue here? I understand what you mean, but I think the mechanics here work well enough and not sure if spending more time on optimizing this is worth it as opposed to other really cool things we can do in 2.0 instead

anna-geller avatar Aug 05 '22 13:08 anna-geller

This does not reproduce on the latest release.

zanieb avatar Sep 23 '22 18:09 zanieb

I don't know what you've changed, but I had this problem when I updated from the old version to the latest version

current time:2023-06-05

Tappy95 avatar Jun 05 '23 11:06 Tappy95