prefect
prefect copied to clipboard
RuntimeError: got Future <Future pending> attached to a different loop
Description
Getting the following error when running an async flow in orion server or cloud
RuntimeError: Task <Task pending name='anyio.from_thread.BlockingPortal._call_func' coro=<BlockingPortal._call_func() running at /Users/noam.cohen/Library/Caches/pypoetry/virtualenvs/finance-lGLYToYy-py3.9/lib/python3.9/site-packages/anyio/from_thread.py:219> cb=[TaskGroup._spawn.<locals>.task_done() at /Users/noam.cohen/Library/Caches/pypoetry/virtualenvs/finance-lGLYToYy-py3.9/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:726]> got Future <Future pending> attached to a different loop
Reproduction
import asyncio
from dotenv import load_dotenv
from prefect import task, flow
@task
async def print_values(values):
for value in values:
await asyncio.sleep(1) # yield
print(value, end=" ")
@task
def get_numbers():
return [1, 2]
@flow(name='test-flow')
async def dummy_flow():
numbers = get_numbers()
await print_values(numbers) # runs immediately
coros = [print_values("abcd"), print_values("6789")]
# asynchronously gather the tasks
await asyncio.gather(*coros)
if __name__ == '__main__':
load_dotenv()
asyncio.run(dummy_flow())
Environment
prefect 2.08b
@anna-geller so no anyio
for now?
Why did you tag me specifically for this @NoamGit? :) I'm no async expert. Could you try the same in the latest version and this example?
import asyncio
from prefect import task, flow
@task
async def print_values(values):
for value in values:
await asyncio.sleep(1) # yield
print(value, end=" ")
@flow
async def async_flow():
await print_values([1, 2]) # runs immediately
coros = [print_values("abcd"), print_values("6789")]
# asynchronously gather the tasks
await asyncio.gather(*coros)
if __name__ == "__main__":
asyncio.run(async_flow())
This works fine for me:
08:24:18.214 | INFO | prefect.engine - Created flow run 'maroon-nightingale' for flow 'async-flow'
08:24:18.215 | INFO | Flow run 'maroon-nightingale' - Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
08:24:18.289 | INFO | Flow run 'maroon-nightingale' - Created task run 'print_values-0bb9a2c3-0' for task 'print_values'
08:24:18.289 | INFO | Flow run 'maroon-nightingale' - Executing 'print_values-0bb9a2c3-0' immediately...
1 2 08:24:20.320 | INFO | Task run 'print_values-0bb9a2c3-0' - Finished in state Completed()
08:24:20.332 | INFO | Flow run 'maroon-nightingale' - Created task run 'print_values-0bb9a2c3-1' for task 'print_values'
08:24:20.332 | INFO | Flow run 'maroon-nightingale' - Executing 'print_values-0bb9a2c3-1' immediately...
08:24:20.339 | INFO | Flow run 'maroon-nightingale' - Created task run 'print_values-0bb9a2c3-2' for task 'print_values'
08:24:20.339 | INFO | Flow run 'maroon-nightingale' - Executing 'print_values-0bb9a2c3-2' immediately...
a 6 b 7 c 8 d 9 08:24:24.382 | INFO | Task run 'print_values-0bb9a2c3-2' - Finished in state Completed()
08:24:24.392 | INFO | Task run 'print_values-0bb9a2c3-1' - Finished in state Completed()
08:24:24.408 | INFO | Flow run 'maroon-nightingale' - Finished in state Completed('All states completed.')
And your example works if you turn the task to async:
import asyncio
from prefect import task, flow
@task
async def print_values(values):
for value in values:
await asyncio.sleep(1) # yield
print(value, end=" ")
@task
async def get_numbers():
return [1, 2]
@flow(name="test-flow")
async def dummy_flow():
numbers = await get_numbers()
await print_values(numbers) # runs immediately
coros = [print_values("abcd"), print_values("6789")]
# asynchronously gather the tasks
await asyncio.gather(*coros)
if __name__ == "__main__":
asyncio.run(dummy_flow())
I tagged you because you asked me to open this issue in a new ticket :). I know you can move everything to async and await everything, but I thought that the idea is to allow execution of regular and coroutine tasks (see Jacob's answer https://github.com/PrefectHQ/prefect/issues/5853#issuecomment-1147655272). Isn't this specified in the requirements for this feature?
Ahh gotcha, next time if you could link to that discussion, it would help, I can't remember all that 😄
Could you use one of the solutions I suggested, and if that works close the issue here? I understand what you mean, but I think the mechanics here work well enough and not sure if spending more time on optimizing this is worth it as opposed to other really cool things we can do in 2.0 instead
This does not reproduce on the latest release.
I don't know what you've changed, but I had this problem when I updated from the old version to the latest version
current time:2023-06-05