RuntimeError: Timeout context manager should be used inside a task when using aiohttp and task.submit
First check
- [X] I added a descriptive title to this issue.
- [X] I used the GitHub search to find a similar issue and didn't find it.
- [X] I searched the Prefect documentation for this issue.
- [X] I checked that this issue is related to Prefect and not one of its dependencies.
Bug summary
When trying to use an aiohttp.ClientSession inside a Prefect task that has been called with the submit() method on a task, I get a "RuntimeError: Timeout context manager should be used inside a task" message.
Reproduction
import aiohttp
import asyncio
from prefect import flow, task
from prefect.utilities.asyncutils import add_event_loop_shutdown_callback
from prefect.task_runners import ConcurrentTaskRunner
@flow(task_runner=ConcurrentTaskRunner)
async def my_flow():
session = aiohttp.ClientSession()
for url in ['https://www.google.com','https://www.microsoft.com']:
data = await download_file.submit(session, url)
do_something_with_data.submit(data)
await add_event_loop_shutdown_callback(session.close)
@task()
async def download_file(session, url):
async with session.get(url) as resp:
assert resp.status == 200
return await resp.read()
@task()
def do_something_with_data(data):
...
if __name__ == '__main__':
asyncio.run(my_flow())
Error
21:04:11.032 | INFO | prefect.engine - Created flow run 'stoic-warthog' for flow 'my-flow'
21:04:11.167 | INFO | Flow run 'stoic-warthog' - Created task run 'download_file-1' for task 'download_file'
21:04:11.170 | INFO | Flow run 'stoic-warthog' - Submitted task run 'download_file-1' for execution.
21:04:11.207 | INFO | Flow run 'stoic-warthog' - Created task run 'download_file-0' for task 'download_file'
21:04:11.209 | INFO | Flow run 'stoic-warthog' - Submitted task run 'download_file-0' for execution.
21:04:11.317 | ERROR | Task run 'download_file-1' - Encountered exception during execution:
Traceback (most recent call last):
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\engine.py", line 2099, in orchestrate_task_run
result = await call.aresult()
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 327, in aresult
return await asyncio.wrap_future(self.future)
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 389, in _run_async
result = await coro
File "D:\ace\acedatatools\AceDataTools\local\task_examples.py", line 18, in download_file
async with session.get(url) as resp:
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\client.py", line 1197, in __aenter__
self._resp = await self._coro
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\client.py", line 507, in _request
with timer:
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\helpers.py", line 715, in __enter__
raise RuntimeError(
RuntimeError: Timeout context manager should be used inside a task
21:04:11.377 | INFO | Flow run 'stoic-warthog' - Created task run 'do_something_with_file-1' for task 'do_something_with_file'
21:04:11.379 | INFO | Flow run 'stoic-warthog' - Submitted task run 'do_something_with_file-1' for execution.
21:04:11.412 | ERROR | Task run 'download_file-0' - Encountered exception during execution:
Traceback (most recent call last):
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\engine.py", line 2099, in orchestrate_task_run
result = await call.aresult()
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 327, in aresult
return await asyncio.wrap_future(self.future)
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 389, in _run_async
result = await coro
File "D:\ace\acedatatools\AceDataTools\local\task_examples.py", line 18, in download_file
async with session.get(url) as resp:
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\client.py", line 1197, in __aenter__
self._resp = await self._coro
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\client.py", line 507, in _request
with timer:
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\helpers.py", line 715, in __enter__
raise RuntimeError(
RuntimeError: Timeout context manager should be used inside a task
21:04:11.435 | ERROR | Task run 'download_file-1' - Finished in state Failed('Task run encountered an exception RuntimeError: Timeout context manager should be used inside a task')
21:04:11.483 | ERROR | Task run 'download_file-0' - Finished in state Failed('Task run encountered an exception RuntimeError: Timeout context manager should be used inside a task')
21:04:11.548 | INFO | Flow run 'stoic-warthog' - Created task run 'do_something_with_file-0' for task 'do_something_with_file'
21:04:11.551 | INFO | Flow run 'stoic-warthog' - Submitted task run 'do_something_with_file-0' for execution.
21:04:11.624 | ERROR | Flow run 'stoic-warthog' - Finished in state Failed('2/4 states failed.')
Traceback (most recent call last):
File "D:\ace\acedatatools\AceDataTools\local\task_examples.py", line 28, in <module>
asyncio.run(my_flow())
File "C:\Users\carlo\AppData\Local\Programs\Python\Python39\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\Users\carlo\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 647, in run_until_complete
return future.result()
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\api.py", line 150, in wait_for_call_in_loop_thread
return call.result()
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 318, in result
return self.future.result(timeout=timeout)
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 179, in result
return self.__get_result()
File "C:\Users\carlo\AppData\Local\Programs\Python\Python39\lib\concurrent\futures\_base.py", line 391, in __get_result
raise self._exception
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 389, in _run_async
result = await coro
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\client\utilities.py", line 78, in with_injected_client
return await fn(*args, **kwargs)
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\engine.py", line 394, in create_then_begin_flow_run
return await state.result(fetch=True)
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\engine.py", line 2099, in orchestrate_task_run
result = await call.aresult()
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 327, in aresult
return await asyncio.wrap_future(self.future)
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 389, in _run_async
result = await coro
File "D:\ace\acedatatools\AceDataTools\local\task_examples.py", line 18, in download_file
async with session.get(url) as resp:
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\client.py", line 1197, in __aenter__
self._resp = await self._coro
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\client.py", line 507, in _request
with timer:
File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\helpers.py", line 715, in __enter__
raise RuntimeError(
RuntimeError: Timeout context manager should be used inside a task
Versions
Version: 2.18.0
API version: 0.8.4
Python version: 3.9.13
Git commit: 1006d2d8
Built: Thu, Apr 18, 2024 4:47 PM
OS/Arch: win32/AMD64
Profile: default
Server type: ephemeral
Server:
Database: sqlite
SQLite version: 3.37.2
Additional context
No response
hi @carlosjourdan - thanks for the issue
I think there are a couple things here
- I dont think its necessary / appropriate to use our low level callback util
add_event_loop_shutdown_callbackfor this, you can just useaiohttp.Sessonas a context manager, and it will handle cleanup for you
@task()
async def download_file(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
assert resp.status == 200
return await resp.read()
- secondly, since tasks currently run in worker threads, you cannot directly share the same
Sessionacross threads. You can create a new session like I am above to avoid this, or you can use subflows which run in the main thread
A full working example
import asyncio
import aiohttp
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@flow(task_runner=ConcurrentTaskRunner)
async def my_flow():
for url in ["https://www.google.com", "https://www.microsoft.com"]:
data = await download_file.submit(url)
do_something_with_data.submit(data)
@task()
async def download_file(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
assert resp.status == 200
return await resp.read()
@task()
def do_something_with_data(data): ...
if __name__ == "__main__":
asyncio.run(my_flow())
If you have any further questions, let me know. otherwise I will close this issue
Hi @zzstoatzz , thanks for the fast reply. I still have two questions, if you don't mind:
- Based on #11930 , shouldn't the task run on the main thread? My use case is explicitly mentioned as one of the motivations arround the PR ("For example, an HTTP client or database connection can be shared between a flow and its tasks now")
- Although the flows on the example you listed complete successfully, an exception is printed to the console with the message
RuntimeError: Event loop is closed. This is actually what threw me off course towards theadd_event_loop_shutdown_callbackcall, because I thought this had something to do with thesession.close()call being made after the event loop was closed. But it seems to happen even in the absence of a shared session. Is this exception "harmless"? If so, shouldn't it be suppressed from output?
Just saw that #11930 was reverted by #12054. So this leaves only the question about the RuntimeError: Event loop is closed message.
hi @carlosjourdan - let me look into that Event loop is closed. I don't remember seeing that but I will see if I can reproduce
Thanks @zzstoatzz . FYI, I'm able to reproduce the error using your "full working example" code on a windows machine with fresh venv in Python 3.9.13. But the code works fine on Python 3.11.7.
Here is the output of pip freeze for reference:
aiohttp==3.9.5
aiosignal==1.3.1
aiosqlite==0.20.0
alembic==1.13.1
annotated-types==0.6.0
anyio==3.7.1
apprise==1.7.6
asgi-lifespan==2.1.0
async-timeout==4.0.3
asyncpg==0.29.0
attrs==23.2.0
cachetools==5.3.3
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
click==8.1.7
cloudpickle==3.0.0
colorama==0.4.6
coolname==2.2.0
croniter==2.0.5
cryptography==42.0.5
dateparser==1.2.0
dnspython==2.6.1
docker==6.1.3
email_validator==2.1.1
exceptiongroup==1.2.1
frozenlist==1.4.1
fsspec==2024.3.1
google-auth==2.29.0
graphviz==0.20.3
greenlet==3.0.3
griffe==0.44.0
h11==0.14.0
h2==4.1.0
hpack==4.0.0
httpcore==1.0.5
httpx==0.27.0
humanize==4.9.0
hyperframe==6.0.1
idna==3.7
importlib_metadata==7.1.0
importlib_resources==6.1.3
itsdangerous==2.2.0
Jinja2==3.1.3
jinja2-humanize-extension==0.4.0
jsonpatch==1.33
jsonpointer==2.4
jsonschema==4.21.1
jsonschema-specifications==2023.12.1
kubernetes==29.0.0
Mako==1.3.3
Markdown==3.6
markdown-it-py==3.0.0
MarkupSafe==2.1.5
mdurl==0.1.2
multidict==6.0.5
oauthlib==3.2.2
orjson==3.10.1
packaging==24.0
pathspec==0.12.1
pendulum==2.1.2
prefect==2.18.1
pyasn1==0.6.0
pyasn1_modules==0.4.0
pycparser==2.22
pydantic==2.7.1
pydantic_core==2.18.2
Pygments==2.17.2
python-dateutil==2.9.0.post0
python-multipart==0.0.9
python-slugify==8.0.4
pytz==2024.1
pytzdata==2020.1
pywin32==306
PyYAML==6.0.1
readchar==4.0.6
referencing==0.35.0
regex==2024.4.28
requests==2.31.0
requests-oauthlib==2.0.0
rfc3339-validator==0.1.4
rich==13.7.1
rpds-py==0.18.0
rsa==4.9
ruamel.yaml==0.18.6
ruamel.yaml.clib==0.2.8
shellingham==1.5.4
six==1.16.0
sniffio==1.3.1
SQLAlchemy==2.0.29
text-unidecode==1.3
toml==0.10.2
typer==0.12.3
typing_extensions==4.11.0
tzdata==2024.1
tzlocal==5.2
ujson==5.9.0
urllib3==2.2.1
uvicorn==0.28.1
websocket-client==1.8.0
websockets==12.0
yarl==1.9.4
zipp==3.18.1
hi @carlosjourdan - thanks for the context! I have reproduced this specifically on older python in windows
I will add this issue to the backlog. We are doing work to simplify our use of threads in the prefect engine, so hopefully that should help fix / diagnose this.