prefect icon indicating copy to clipboard operation
prefect copied to clipboard

RuntimeError: Timeout context manager should be used inside a task when using aiohttp and task.submit

Open carlosjourdan opened this issue 2 years ago • 6 comments

First check

  • [X] I added a descriptive title to this issue.
  • [X] I used the GitHub search to find a similar issue and didn't find it.
  • [X] I searched the Prefect documentation for this issue.
  • [X] I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

When trying to use an aiohttp.ClientSession inside a Prefect task that has been called with the submit() method on a task, I get a "RuntimeError: Timeout context manager should be used inside a task" message.

Reproduction

import aiohttp 
import asyncio
from prefect import flow, task
from prefect.utilities.asyncutils import add_event_loop_shutdown_callback
from prefect.task_runners import ConcurrentTaskRunner

@flow(task_runner=ConcurrentTaskRunner)
async def my_flow():
    session = aiohttp.ClientSession()

    for url in ['https://www.google.com','https://www.microsoft.com']:
        data = await download_file.submit(session, url)
        do_something_with_data.submit(data)

    await add_event_loop_shutdown_callback(session.close)

@task()
async def download_file(session, url):
    async with session.get(url) as resp:
        assert resp.status == 200
        return await resp.read()
    
@task()
def do_something_with_data(data):
    ...
    

if __name__ == '__main__':
    asyncio.run(my_flow())

Error

21:04:11.032 | INFO    | prefect.engine - Created flow run 'stoic-warthog' for flow 'my-flow'
21:04:11.167 | INFO    | Flow run 'stoic-warthog' - Created task run 'download_file-1' for task 'download_file'
21:04:11.170 | INFO    | Flow run 'stoic-warthog' - Submitted task run 'download_file-1' for execution.
21:04:11.207 | INFO    | Flow run 'stoic-warthog' - Created task run 'download_file-0' for task 'download_file'
21:04:11.209 | INFO    | Flow run 'stoic-warthog' - Submitted task run 'download_file-0' for execution.
21:04:11.317 | ERROR   | Task run 'download_file-1' - Encountered exception during execution:
Traceback (most recent call last):
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\engine.py", line 2099, in orchestrate_task_run
    result = await call.aresult()
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 389, in _run_async
    result = await coro
  File "D:\ace\acedatatools\AceDataTools\local\task_examples.py", line 18, in download_file
    async with session.get(url) as resp:
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\client.py", line 1197, in __aenter__
    self._resp = await self._coro
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\client.py", line 507, in _request
    with timer:
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\helpers.py", line 715, in __enter__
    raise RuntimeError(
RuntimeError: Timeout context manager should be used inside a task
21:04:11.377 | INFO    | Flow run 'stoic-warthog' - Created task run 'do_something_with_file-1' for task 'do_something_with_file'
21:04:11.379 | INFO    | Flow run 'stoic-warthog' - Submitted task run 'do_something_with_file-1' for execution.
21:04:11.412 | ERROR   | Task run 'download_file-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\engine.py", line 2099, in orchestrate_task_run
    result = await call.aresult()
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 389, in _run_async
    result = await coro
  File "D:\ace\acedatatools\AceDataTools\local\task_examples.py", line 18, in download_file
    async with session.get(url) as resp:
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\client.py", line 1197, in __aenter__
    self._resp = await self._coro
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\client.py", line 507, in _request
    with timer:
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\helpers.py", line 715, in __enter__
    raise RuntimeError(
RuntimeError: Timeout context manager should be used inside a task
21:04:11.435 | ERROR   | Task run 'download_file-1' - Finished in state Failed('Task run encountered an exception RuntimeError: Timeout context manager should be used inside a task')
21:04:11.483 | ERROR   | Task run 'download_file-0' - Finished in state Failed('Task run encountered an exception RuntimeError: Timeout context manager should be used inside a task')
21:04:11.548 | INFO    | Flow run 'stoic-warthog' - Created task run 'do_something_with_file-0' for task 'do_something_with_file'
21:04:11.551 | INFO    | Flow run 'stoic-warthog' - Submitted task run 'do_something_with_file-0' for execution.
21:04:11.624 | ERROR   | Flow run 'stoic-warthog' - Finished in state Failed('2/4 states failed.')
Traceback (most recent call last):
  File "D:\ace\acedatatools\AceDataTools\local\task_examples.py", line 28, in <module>
    asyncio.run(my_flow())
  File "C:\Users\carlo\AppData\Local\Programs\Python\Python39\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Users\carlo\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 647, in run_until_complete
    return future.result()
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\api.py", line 150, in wait_for_call_in_loop_thread
    return call.result()
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 318, in result
    return self.future.result(timeout=timeout)
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 179, in result
    return self.__get_result()
  File "C:\Users\carlo\AppData\Local\Programs\Python\Python39\lib\concurrent\futures\_base.py", line 391, in __get_result
    raise self._exception
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 389, in _run_async
    result = await coro
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\client\utilities.py", line 78, in with_injected_client
    return await fn(*args, **kwargs)
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\engine.py", line 394, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\engine.py", line 2099, in orchestrate_task_run
    result = await call.aresult()
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 389, in _run_async
    result = await coro
  File "D:\ace\acedatatools\AceDataTools\local\task_examples.py", line 18, in download_file
    async with session.get(url) as resp:
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\client.py", line 1197, in __aenter__
    self._resp = await self._coro
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\client.py", line 507, in _request
    with timer:
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\helpers.py", line 715, in __enter__
    raise RuntimeError(
RuntimeError: Timeout context manager should be used inside a task

Versions

Version:             2.18.0
API version:         0.8.4
Python version:      3.9.13
Git commit:          1006d2d8
Built:               Thu, Apr 18, 2024 4:47 PM
OS/Arch:             win32/AMD64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.37.2

Additional context

No response

carlosjourdan avatar Apr 24 '24 00:04 carlosjourdan

hi @carlosjourdan - thanks for the issue

I think there are a couple things here

  • I dont think its necessary / appropriate to use our low level callback util add_event_loop_shutdown_callback for this, you can just use aiohttp.Sesson as a context manager, and it will handle cleanup for you
@task()
async def download_file(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            assert resp.status == 200
            return await resp.read()
  • secondly, since tasks currently run in worker threads, you cannot directly share the same Session across threads. You can create a new session like I am above to avoid this, or you can use subflows which run in the main thread
A full working example
import asyncio

import aiohttp

from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner


@flow(task_runner=ConcurrentTaskRunner)
async def my_flow():
    for url in ["https://www.google.com", "https://www.microsoft.com"]:
        data = await download_file.submit(url)
        do_something_with_data.submit(data)


@task()
async def download_file(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            assert resp.status == 200
            return await resp.read()


@task()
def do_something_with_data(data): ...


if __name__ == "__main__":
    asyncio.run(my_flow())

If you have any further questions, let me know. otherwise I will close this issue

zzstoatzz avatar Apr 24 '24 13:04 zzstoatzz

Hi @zzstoatzz , thanks for the fast reply. I still have two questions, if you don't mind:

  • Based on #11930 , shouldn't the task run on the main thread? My use case is explicitly mentioned as one of the motivations arround the PR ("For example, an HTTP client or database connection can be shared between a flow and its tasks now")
  • Although the flows on the example you listed complete successfully, an exception is printed to the console with the message RuntimeError: Event loop is closed. This is actually what threw me off course towards the add_event_loop_shutdown_callback call, because I thought this had something to do with the session.close() call being made after the event loop was closed. But it seems to happen even in the absence of a shared session. Is this exception "harmless"? If so, shouldn't it be suppressed from output?

carlosjourdan avatar Apr 24 '24 15:04 carlosjourdan

Just saw that #11930 was reverted by #12054. So this leaves only the question about the RuntimeError: Event loop is closed message.

carlosjourdan avatar Apr 24 '24 16:04 carlosjourdan

hi @carlosjourdan - let me look into that Event loop is closed. I don't remember seeing that but I will see if I can reproduce

zzstoatzz avatar Apr 29 '24 19:04 zzstoatzz

Thanks @zzstoatzz . FYI, I'm able to reproduce the error using your "full working example" code on a windows machine with fresh venv in Python 3.9.13. But the code works fine on Python 3.11.7.

Here is the output of pip freeze for reference:

aiohttp==3.9.5
aiosignal==1.3.1
aiosqlite==0.20.0
alembic==1.13.1
annotated-types==0.6.0
anyio==3.7.1
apprise==1.7.6
asgi-lifespan==2.1.0
async-timeout==4.0.3
asyncpg==0.29.0
attrs==23.2.0
cachetools==5.3.3
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
click==8.1.7
cloudpickle==3.0.0
colorama==0.4.6
coolname==2.2.0
croniter==2.0.5
cryptography==42.0.5
dateparser==1.2.0
dnspython==2.6.1
docker==6.1.3
email_validator==2.1.1
exceptiongroup==1.2.1
frozenlist==1.4.1
fsspec==2024.3.1
google-auth==2.29.0
graphviz==0.20.3
greenlet==3.0.3
griffe==0.44.0
h11==0.14.0
h2==4.1.0
hpack==4.0.0
httpcore==1.0.5
httpx==0.27.0
humanize==4.9.0
hyperframe==6.0.1
idna==3.7
importlib_metadata==7.1.0
importlib_resources==6.1.3
itsdangerous==2.2.0
Jinja2==3.1.3
jinja2-humanize-extension==0.4.0
jsonpatch==1.33
jsonpointer==2.4
jsonschema==4.21.1
jsonschema-specifications==2023.12.1
kubernetes==29.0.0
Mako==1.3.3
Markdown==3.6
markdown-it-py==3.0.0
MarkupSafe==2.1.5
mdurl==0.1.2
multidict==6.0.5
oauthlib==3.2.2
orjson==3.10.1
packaging==24.0
pathspec==0.12.1
pendulum==2.1.2
prefect==2.18.1
pyasn1==0.6.0
pyasn1_modules==0.4.0
pycparser==2.22
pydantic==2.7.1
pydantic_core==2.18.2
Pygments==2.17.2
python-dateutil==2.9.0.post0
python-multipart==0.0.9
python-slugify==8.0.4
pytz==2024.1
pytzdata==2020.1
pywin32==306
PyYAML==6.0.1
readchar==4.0.6
referencing==0.35.0
regex==2024.4.28
requests==2.31.0
requests-oauthlib==2.0.0
rfc3339-validator==0.1.4
rich==13.7.1
rpds-py==0.18.0
rsa==4.9
ruamel.yaml==0.18.6
ruamel.yaml.clib==0.2.8
shellingham==1.5.4
six==1.16.0
sniffio==1.3.1
SQLAlchemy==2.0.29
text-unidecode==1.3
toml==0.10.2
typer==0.12.3
typing_extensions==4.11.0
tzdata==2024.1
tzlocal==5.2
ujson==5.9.0
urllib3==2.2.1
uvicorn==0.28.1
websocket-client==1.8.0
websockets==12.0
yarl==1.9.4
zipp==3.18.1

carlosjourdan avatar Apr 29 '24 22:04 carlosjourdan

hi @carlosjourdan - thanks for the context! I have reproduced this specifically on older python in windows

I will add this issue to the backlog. We are doing work to simplify our use of threads in the prefect engine, so hopefully that should help fix / diagnose this.

zzstoatzz avatar Apr 29 '24 23:04 zzstoatzz