prefect
prefect copied to clipboard
Database QueuePool issue with 1500+ task flows
First check
- [X] I added a descriptive title to this issue.
- [X] I used the GitHub search to find a similar issue and didn't find it.
- [X] I searched the Prefect documentation for this issue.
- [X] I checked that this issue is related to Prefect and not one of its dependencies.
Bug summary
I have a flow with a current size of 3000 tasks that I map across the ConcurrentTaskRunner. However, when I try to run the flow with around 1500 tasks or more, I get this QueuePool error.
Reproduction
import time
from datetime import datetime
from prefect import flow, task, get_run_logger
@task(name="Test Task")
def test_task(my_range):
logger = get_run_logger()
logger.info(my_range)
time.sleep(5)
logger.info(datetime.today())
@flow(name="Test Flow")
def test_flow():
iterate_list = [x for x in range(3000)]
blah = test_task.map(iterate_list)
print(blah)
if __name__ == "__main__":
test_flow()
Error
Traceback (most recent call last):
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/exceptions.py", line 93, in __call__
raise exc
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/exceptions.py", line 82, in __call__
await self.app(scope, receive, sender)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 670, in __call__
await route.handle(scope, receive, send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 266, in handle
await self.app(scope, receive, send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 65, in app
response = await func(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
response = await default_handler(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/routing.py", line 227, in app
raw_response = await run_endpoint_function(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
return await dependant.call(**values)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/api/task_runs.py", line 46, in create_task_run
model = await models.task_runs.create_task_run(session=session, task_run=task_run)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 112, in async_wrapper
return await fn(*args, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/models/task_runs.py", line 61, in create_task_run
await session.execute(insert_stmt)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 215, in execute
result = await greenlet_spawn(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
result = context.throw(*sys.exc_info())
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1711, in execute
conn = self._connection_for_bind(bind)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1552, in _connection_for_bind
return self._transaction._connection_for_bind(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 747, in _connection_for_bind
conn = bind.connect()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/future/engine.py", line 406, in connect
return super(Engine, self).connect()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3315, in connect
return self._connection_cls(self, close_with_result=close_with_result)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 96, in __init__
else engine.raw_connection()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3394, in raw_connection
return self._wrap_pool_connect(self.pool.connect, _connection)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
return fn()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 310, in connect
return _ConnectionFairy._checkout(self)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 868, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 476, in checkout
rec = pool._do_get()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 134, in _do_get
raise exc.TimeoutError(
sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30.00 (Background on this error at: https://sqlalche.me/e/14/3o7r)
16:20:57.674 | ERROR | prefect.orion - Encountered exception in request:
Traceback (most recent call last):
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/exceptions.py", line 93, in __call__
raise exc
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/exceptions.py", line 82, in __call__
await self.app(scope, receive, sender)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 670, in __call__
await route.handle(scope, receive, send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 266, in handle
await self.app(scope, receive, send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 65, in app
response = await func(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
response = await default_handler(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/routing.py", line 227, in app
raw_response = await run_endpoint_function(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
return await dependant.call(**values)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/api/task_runs.py", line 46, in create_task_run
model = await models.task_runs.create_task_run(session=session, task_run=task_run)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 112, in async_wrapper
return await fn(*args, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/models/task_runs.py", line 61, in create_task_run
await session.execute(insert_stmt)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 215, in execute
result = await greenlet_spawn(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
result = context.throw(*sys.exc_info())
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1711, in execute
conn = self._connection_for_bind(bind)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1552, in _connection_for_bind
return self._transaction._connection_for_bind(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 747, in _connection_for_bind
conn = bind.connect()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/future/engine.py", line 406, in connect
return super(Engine, self).connect()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3315, in connect
return self._connection_cls(self, close_with_result=close_with_result)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 96, in __init__
else engine.raw_connection()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3394, in raw_connection
return self._wrap_pool_connect(self.pool.connect, _connection)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
return fn()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 310, in connect
return _ConnectionFairy._checkout(self)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 868, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 476, in checkout
rec = pool._do_get()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 134, in _do_get
raise exc.TimeoutError(
sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30.00 (Background on this error at: https://sqlalche.me/e/14/3o7r)
16:20:57.676 | ERROR | prefect.orion - Encountered exception in request:
Traceback (most recent call last):
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/exceptions.py", line 93, in __call__
raise exc
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/exceptions.py", line 82, in __call__
await self.app(scope, receive, sender)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 670, in __call__
await route.handle(scope, receive, send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 266, in handle
await self.app(scope, receive, send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 65, in app
response = await func(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
response = await default_handler(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/routing.py", line 227, in app
raw_response = await run_endpoint_function(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
return await dependant.call(**values)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/api/task_runs.py", line 46, in create_task_run
model = await models.task_runs.create_task_run(session=session, task_run=task_run)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 112, in async_wrapper
return await fn(*args, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/models/task_runs.py", line 61, in create_task_run
await session.execute(insert_stmt)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 215, in execute
result = await greenlet_spawn(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
result = context.throw(*sys.exc_info())
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1711, in execute
conn = self._connection_for_bind(bind)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1552, in _connection_for_bind
return self._transaction._connection_for_bind(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 747, in _connection_for_bind
conn = bind.connect()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/future/engine.py", line 406, in connect
return super(Engine, self).connect()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3315, in connect
return self._connection_cls(self, close_with_result=close_with_result)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 96, in __init__
else engine.raw_connection()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3394, in raw_connection
return self._wrap_pool_connect(self.pool.connect, _connection)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
return fn()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 310, in connect
return _ConnectionFairy._checkout(self)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 868, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 476, in checkout
rec = pool._do_get()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 134, in _do_get
raise exc.TimeoutError(
sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30.00 (Background on this error at: https://sqlalche.me/e/14/3o7r)
16:20:57.676 | ERROR | prefect.orion - Encountered exception in request:
Traceback (most recent call last):
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/exceptions.py", line 93, in __call__
raise exc
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/exceptions.py", line 82, in __call__
await self.app(scope, receive, sender)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 670, in __call__
await route.handle(scope, receive, send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 266, in handle
await self.app(scope, receive, send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 65, in app
response = await func(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
response = await default_handler(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/routing.py", line 227, in app
raw_response = await run_endpoint_function(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
return await dependant.call(**values)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/api/task_runs.py", line 46, in create_task_run
model = await models.task_runs.create_task_run(session=session, task_run=task_run)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 112, in async_wrapper
return await fn(*args, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/models/task_runs.py", line 61, in create_task_run
await session.execute(insert_stmt)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 215, in execute
result = await greenlet_spawn(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
result = context.throw(*sys.exc_info())
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1711, in execute
conn = self._connection_for_bind(bind)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1552, in _connection_for_bind
return self._transaction._connection_for_bind(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 747, in _connection_for_bind
conn = bind.connect()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/future/engine.py", line 406, in connect
return super(Engine, self).connect()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3315, in connect
return self._connection_cls(self, close_with_result=close_with_result)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 96, in __init__
else engine.raw_connection()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3394, in raw_connection
return self._wrap_pool_connect(self.pool.connect, _connection)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
return fn()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 310, in connect
return _ConnectionFairy._checkout(self)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 868, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 476, in checkout
rec = pool._do_get()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 134, in _do_get
raise exc.TimeoutError(
sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30.00 (Background on this error at: https://sqlalche.me/e/14/3o7r)
16:20:57.708 | INFO | Task run 'Test Task-ee4e2833-2964' - Finished in state Completed()
16:20:58.746 | INFO | Task run 'Test Task-ee4e2833-1524' - Finished in state Completed()
16:20:58.974 | INFO | Task run 'Test Task-ee4e2833-1494' - 1494
16:20:58.983 | INFO | Task run 'Test Task-ee4e2833-1494' - 2022-08-19 16:20:58.983039
16:20:59.028 | ERROR | prefect.orion - Encountered exception in request:
Traceback (most recent call last):
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 709, in _start_transaction
self._transaction = self._connection.transaction(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/asyncpg/connection.py", line 275, in transaction
self._check_open()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/asyncpg/connection.py", line 1400, in _check_open
raise exceptions.InterfaceError('connection is closed')
asyncpg.exceptions._base.InterfaceError: connection is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
cursor.execute(statement, parameters)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 479, in execute
self._adapt_connection.await_(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
return current.driver.switch(awaitable)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
value = await result
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 408, in _prepare_and_execute
await adapt_connection._start_transaction()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 716, in _start_transaction
self._handle_exception(error)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 682, in _handle_exception
raise translated_error from error
sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.InterfaceError: <class 'asyncpg.exceptions._base.InterfaceError'>: connection is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/exceptions.py", line 93, in __call__
raise exc
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/exceptions.py", line 82, in __call__
await self.app(scope, receive, sender)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 670, in __call__
await route.handle(scope, receive, send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 266, in handle
await self.app(scope, receive, send)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 65, in app
response = await func(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
response = await default_handler(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/routing.py", line 227, in app
raw_response = await run_endpoint_function(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
return await dependant.call(**values)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/api/task_runs.py", line 186, in set_task_run_state
orchestration_result = await models.task_runs.set_task_run_state(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/models/task_runs.py", line 285, in set_task_run_state
run = await models.task_runs.read_task_run(session=session, task_run_id=task_run_id)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 112, in async_wrapper
return await fn(*args, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/models/task_runs.py", line 103, in read_task_run
model = await session.get(db.TaskRun, task_run_id)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 299, in get
return await greenlet_spawn(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 128, in greenlet_spawn
result = context.switch(value)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2805, in get
return self._get_impl(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2912, in _get_impl
return db_load_fn(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity
session.execute(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1712, in execute
result = conn._execute_20(statement, params or {}, execution_options)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 333, in _execute_on_connection
return connection._execute_clauseelement(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
ret = self._execute_context(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
self._handle_dbapi_exception(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2124, in _handle_dbapi_exception
util.raise_(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
raise exception
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
cursor.execute(statement, parameters)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 479, in execute
self._adapt_connection.await_(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
return current.driver.switch(awaitable)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
value = await result
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 408, in _prepare_and_execute
await adapt_connection._start_transaction()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 716, in _start_transaction
self._handle_exception(error)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 682, in _handle_exception
raise translated_error from error
sqlalchemy.exc.InterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) <class 'asyncpg.exceptions._base.InterfaceError'>: connection is closed
[SQL: SELECT task_run.id AS task_run_id, task_run.created AS task_run_created, task_run.updated AS task_run_updated, task_run.name AS task_run_name, task_run.state_type AS task_run_state_type, task_run.state_name AS task_run_state_name, task_run.run_count AS task_run_run_count, task_run.expected_start_time AS task_run_expected_start_time, task_run.next_scheduled_start_time AS task_run_next_scheduled_start_time, task_run.start_time AS task_run_start_time, task_run.end_time AS task_run_end_time, task_run.total_run_time AS task_run_total_run_time, task_run.task_key AS task_run_task_key, task_run.dynamic_key AS task_run_dynamic_key, task_run.cache_key AS task_run_cache_key, task_run.cache_expiration AS task_run_cache_expiration, task_run.task_version AS task_run_task_version, task_run.empirical_policy AS task_run_empirical_policy, task_run.task_inputs AS task_run_task_inputs, task_run.tags AS task_run_tags, task_run.flow_run_id AS task_run_flow_run_id, task_run.state_id AS task_run_state_id, task_run_state_1.id AS task_run_state_1_id, task_run_state_1.created AS task_run_state_1_created, task_run_state_1.updated AS task_run_state_1_updated, task_run_state_1.type AS task_run_state_1_type, task_run_state_1.timestamp AS task_run_state_1_timestamp, task_run_state_1.name AS task_run_state_1_name, task_run_state_1.message AS task_run_state_1_message, task_run_state_1.state_details AS task_run_state_1_state_details, task_run_state_1.data AS task_run_state_1_data, task_run_state_1.task_run_id AS task_run_state_1_task_run_id
FROM task_run LEFT OUTER JOIN task_run_state AS task_run_state_1 ON task_run_state_1.id = task_run.state_id
WHERE task_run.id = %s]
[parameters: (UUID('099bf6d1-1062-42a4-8cd7-05e97d4b0adf'),)]
(Background on this error at: https://sqlalche.me/e/14/rvf5)
Versions
Version: 2.1.1
API version: 0.8.0
Python version: 3.10.4
Git commit: dc2ba222
Built: Thu, Aug 18, 2022 10:18 AM
OS/Arch: linux/x86_64
Profile: default
Server type: ephemeral
Server:
Database: postgresql
Additional context
This is a flow I am porting from Prefect 1.0 where it runs just fine at the 3000 task level. I even have flows that run more than 3000 mapped tasks without any issue on Prefect 1.0, which I would like to port to 2.0.
Thanks for the issue! I see you’re using the ephemeral server which means that each client needs to connect to your postgres database. This is only intended to facilitate onboarding and small flows. Once you start to scale out, you should run the API in a separate process with prefect orion start
then connect your flow runs to it by specifying the PREFECT_API_URL
.
If that doesn't resolve it definitely let me know and we can dig into this issue.
Getting the following error, but I am trying again.
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 566, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/home/tenders/Documents/code/prefect-orion/platform_prefect/accounts/accounts_subscriptions.py", line 318, in main
sub_data.extend(item.result())
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/futures.py", line 216, in result
return sync(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 221, in sync
return run_async_from_worker_thread(__async_fn, *args, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 439, in result
return self.__get_result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/futures.py", line 227, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/task_runners.py", line 316, in _run_and_store_result
self._results[run_key] = await run_fn(**run_kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 957, in begin_task_run
raise RuntimeError(
RuntimeError: Cannot orchestrate task run '64a9b43e-1b48-4032-9015-d7a229d7ad44'. Failed to connect to API at http://127.0.0.1:4200/api/.```
This stack trace might be more useful, but I am not getting the QueuePool error!
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
response = await connection.handle_async_request(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpcore/_async/connection.py", line 88, in handle_async_request
raise ConnectionNotAvailable()
httpcore.ConnectionNotAvailable
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/client.py", line 379, in api_healthcheck
await self._client.get("/health")
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1751, in get
return await self.request(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/client.py", line 259, in send
await super().send(*args, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1614, in send
response = await self._send_handling_auth(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1642, in _send_handling_auth
response = await self._send_handling_redirects(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects
response = await self._send_single_request(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1716, in _send_single_request
response = await transport.handle_async_request(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 246, in handle_async_request
async with self._pool_lock:
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_core/_synchronization.py", line 130, in acquire
await event.wait()
File "/usr/lib/python3.10/asyncio/locks.py", line 214, in wait
await fut
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/client.py", line 378, in api_healthcheck
with anyio.fail_after(10):
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
raise TimeoutError
TimeoutError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/client.py", line 105, in with_injected_client
return await fn(*args, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 228, in create_then_begin_flow_run
return state.result()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 566, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/home/tenders/Documents/code/prefect-orion/platform_prefect/accounts/accounts_subscriptions.py", line 318, in main
sub_data.extend(item.result())
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/futures.py", line 216, in result
return sync(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 221, in sync
return run_async_from_worker_thread(__async_fn, *args, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/futures.py", line 227, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/task_runners.py", line 316, in _run_and_store_result
self._results[run_key] = await run_fn(**run_kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 957, in begin_task_run
raise RuntimeError(
RuntimeError: Cannot orchestrate task run '287e4fef-5daf-4689-8007-a9389aa5944b'. Failed to connect to API at http://127.0.0.1:4200/api/.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/tenders/Documents/code/prefect-orion/platform_prefect/accounts/accounts_subscriptions.py", line 336, in <module>
flow_result = main()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/flows.py", line 384, in __call__
return enter_flow_run_engine_from_flow_call(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 152, in enter_flow_run_engine_from_flow_call
return anyio.run(begin_run)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
return future.result()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/client.py", line 103, in with_injected_client
async with client_context as client:
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/client.py", line 2015, in __aexit__
return await self._exit_stack.__aexit__(*exc_info)
File "/usr/lib/python3.10/contextlib.py", line 714, in __aexit__
raise exc_details[1]
File "/usr/lib/python3.10/contextlib.py", line 697, in __aexit__
cb_suppress = await cb(*exc_details)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1997, in __aexit__
await self._transport.__aexit__(exc_type, exc_value, traceback)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_transports/default.py", line 332, in __aexit__
await self._pool.__aexit__(exc_type, exc_value, traceback)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 326, in __aexit__
await self.aclose()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 312, in aclose
raise RuntimeError(
RuntimeError: The connection pool was closed while 210 HTTP requests/responses were still in-flight.```
Are you running your flow on the same machine that you started the server on? It doesn't look like the server is available at that URL.
Yes I am using the same machine, and things started out just fine. I can still see the Orion server process running and the UI is up at my defined PREFECT_API_URL.
http://127.0.0.1:4200/api
So just to clarify, some of your tasks are running and successfully connecting to the API but eventually one fails with this ConnectionNotAvailable
exception?
That looks to be what is happening, yes.
Checking in on this issue. Is this something being looked into? It is still tagged "As Designed". Thank you.
I also get the Cannot orchestrate task run
error with the SequentialTaskRunner
It looks like somehow the client is getting saturated and tasks are failing to connect to the API with httpcore.ConnectionNotAvailable
This will require further investigation. cc @zangell44 on the performance raft.
Update: @gabcoyne pointed me towards hooking into the cloud API and that worked once, but then I adjusted something in the flow and I get the same error. Here is the stack trace:
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 236, in create_then_begin_flow_run
return state.result()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 587, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/home/tenders/Documents/code/prefect-orion/platform_prefect/accounts/accounts_subscriptions.py", line 320, in main
sub_data.extend(item.result())
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/futures.py", line 216, in result
return sync(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 221, in sync
return run_async_from_worker_thread(__async_fn, *args, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/futures.py", line 227, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/task_runners.py", line 214, in submit
result = await run_fn(**run_kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 994, in begin_task_run
raise RuntimeError(
RuntimeError: Cannot orchestrate task run '41a79582-00c0-4177-985b-c9c39a56f41e'. Failed to connect to API at https://api.prefect.cloud/api/accounts/cafe3a79-624b-468d-87a7-97fde3358a01/workspaces/5d09d677-90b8-4ef8-a9be-9760b422937a/.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/tenders/Documents/code/prefect-orion/platform_prefect/accounts/accounts_subscriptions.py", line 339, in <module>
flow_result = main()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/flows.py", line 384, in __call__
return enter_flow_run_engine_from_flow_call(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 156, in enter_flow_run_engine_from_flow_call
return anyio.run(begin_run)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
return future.result()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/client.py", line 102, in with_injected_client
async with client_context as client:
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/client.py", line 1930, in __aexit__
return await self._exit_stack.__aexit__(*exc_info)
File "/usr/lib/python3.10/contextlib.py", line 714, in __aexit__
raise exc_details[1]
File "/usr/lib/python3.10/contextlib.py", line 697, in __aexit__
cb_suppress = await cb(*exc_details)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1997, in __aexit__
await self._transport.__aexit__(exc_type, exc_value, traceback)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_transports/default.py", line 332, in __aexit__
await self._pool.__aexit__(exc_type, exc_value, traceback)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 326, in __aexit__
await self.aclose()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 312, in aclose
raise RuntimeError(
RuntimeError: The connection pool was closed while 1 HTTP requests/responses were still in-flight.```
Is there an update on this issue? I am running into the same error when trying to run a flow with large number of tasks.
Is there an update on this issue? I am running into the same error when trying to run a flow with large number of tasks.
We are still investigating this, thanks for reporting!
Any update on this? Basically the ConcurrentTaskRunner
is non-functional at this point without this getting fixed. This is preventing our team form adopting Prefect!
Hi @ratulotron! We're actively working on these types of issues and hope to have pull requests to resolve them within the next few weeks.
With that said I was actually attempting to reproduce this issue yesterday and was unable to do so on the most recent version. Have you tried it recently? If you have and are still seeing issues, do you have an example that's failing?
In that case there is a possibility that I am running things in a wrong way. I have the following code as a deployment at the moment, without any concurrency limit as that didn't seem to effect the error. I am using a local PostgreSQL database as well. There are around 30k items, I can see a lot of entries get processed (i.e. the ID gets printed) then all of a sudden I get the error TimeOut
. I tried making the functions async as well, but the result was same. Prefect version is 2.6.7
.
Edit: removed unused ElasticSearch related stuff from snippet.
from prefect import flow, task, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
import json
FILENAME = "items.json"
# Method to push a single doc
@task(
retries=3,
retry_delay_seconds=300,
timeout_seconds=10
)
def create_doc(company):
logger = get_run_logger()
item_id = company["item_id"]
logger.info("Now processing <%s>!", item_id)
return item_id
@flow(task_runner=ConcurrentTaskRunner())
def push_to_elasticsearch():
with open(FILENAME, "r", encoding="utf-8") as f:
for line in f:
data = json.loads(line)
create_doc.submit(data)
if __name__ == "__main__":
push_to_elasticsearch()
Error trace:
16:36:02.843 | ERROR | Task run 'create_doc-e82099c9-216' - Crash detected!
Execution was cancelled by the runtime environment.
16:36:08.511 | ERROR | Flow run 'adorable-squirrel' - Crash detected!
Execution was interrupted by an unexpected exception: Traceback (most recent
call last):
File
"/Users/ratulotron/.pyenv/versions/3.8.14/lib/python3.8/contextlib.py",
line 189, in __aexit__
await self.gen.athrow(typ, value, traceback)
File
"/Users/ratulotron/.poetry/virtualenvs/sb-dataeng-notebooks-nb-GiflI-py3.8/
lib/python3.8/site-packages/prefect/task_runners.py", line 163, in start
yield self
File
"/Users/ratulotron/.poetry/virtualenvs/sb-dataeng-notebooks-nb-GiflI-py3.8/
lib/python3.8/site-packages/prefect/engine.py", line 361, in begin_flow_run
terminal_state = await orchestrate_flow_run(
File
"/Users/ratulotron/.poetry/virtualenvs/sb-dataeng-notebooks-nb-GiflI-py3.8/
lib/python3.8/site-packages/prefect/engine.py", line 610, in
orchestrate_flow_run
result = await run_sync(flow_call)
File
"/Users/ratulotron/.poetry/virtualenvs/sb-dataeng-notebooks-nb-GiflI-py3.8/
lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 68, in
run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File
"/Users/ratulotron/.poetry/virtualenvs/sb-dataeng-notebooks-nb-GiflI-py3.8/
lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File
"/Users/ratulotron/.poetry/virtualenvs/sb-dataeng-notebooks-nb-GiflI-py3.8/
lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in
run_sync_in_worker_thread
return await future
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached,
connection timed out, timeout 30.00 (Background on this error at:
https://sqlalche.me/e/14/3o7r)
Despite the work in #8887 I'm encountering a locked database with SQLite
12:48:19.505 | INFO | Task run 'Test Task-2809' - WorkerThread-338
12:48:19.512 | ERROR | prefect.server - Encountered exception in request:
Traceback (most recent call last):
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 108, in execute
self._adapt_connection._handle_exception(error)
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 236, in _handle_exception
raise error
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 90, in execute
self.await_(_cursor.execute(operation, parameters))
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
return current.driver.switch(awaitable)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
value = await result
^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/aiosqlite/cursor.py", line 37, in execute
await self._execute(self._cursor.execute, sql, parameters)
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/aiosqlite/cursor.py", line 31, in _execute
return await self._conn._execute(fn, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/aiosqlite/core.py", line 137, in _execute
return await future
^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/aiosqlite/core.py", line 110, in run
result = function()
^^^^^^^^^^
sqlite3.OperationalError: database is locked
model = await models.task_runs.create_task_run(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/dev/prefect/src/prefect/server/database/dependencies.py", line 119, in async_wrapper
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/dev/prefect/src/prefect/server/models/task_runs.py", line 63, in create_task_run
await session.execute(insert_stmt)
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 214, in execute
result = await greenlet_spawn(
^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
result = context.throw(*sys.exc_info())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1714, in execute
result = conn._execute_20(statement, params or {}, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
return connection._execute_clauseelement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
ret = self._execute_context(
^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
self._handle_dbapi_exception(
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2124, in _handle_dbapi_exception
util.raise_(
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 108, in execute
self._adapt_connection._handle_exception(error)
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 236, in _handle_exception
raise error
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 90, in execute
self.await_(_cursor.execute(operation, parameters))
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
return current.driver.switch(awaitable)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
value = await result
^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/aiosqlite/cursor.py", line 37, in execute
await self._execute(self._cursor.execute, sql, parameters)
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/aiosqlite/cursor.py", line 31, in _execute
return await self._conn._execute(fn, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/aiosqlite/core.py", line 137, in _execute
return await future
^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/aiosqlite/core.py", line 110, in run
result = function()
^^^^^^^^^^
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
[SQL: INSERT INTO task_run (id, created, updated, name, run_count, total_run_time, task_key, dynamic_key, cache_key, cache_expiration, task_version, flow_run_run_count, empirical_policy, task_inputs, tags, flow_run_id) VALUES (:id, :created, :updated, :name, :run_count, :total_run_time, :task_key, :dynamic_key, :cache_key, :cache_expiration, :task_version, :flow_run_run_count, :empirical_policy, :task_inputs, :tags, :flow_run_id) ON CONFLICT (flow_run_id, task_key, dynamic_key) DO NOTHING]
[parameters: {'id': 'd6372ac9-effe-48bc-a87e-74059ea593e7', 'created': '2023-03-23 17:47:19.015579', 'updated': '2023-03-23 17:47:19.126383', 'name': 'Test Task-80', 'run_count': 0, 'total_run_time': '1970-01-01 00:00:00.000000', 'task_key': '__main__.test_task', 'dynamic_key': '80', 'cache_key': None, 'cache_expiration': None, 'task_version': None, 'flow_run_run_count': 0, 'empirical_policy': '{"max_retries": 0, "retry_delay_seconds": 0.0, "retries": 0, "retry_delay": 0, "retry_jitter_factor": null}', 'task_inputs': '{"my_range": []}', 'tags': '[]', 'flow_run_id': 'e3dafbe4-ee87-4e99-9c69-a83bd593d236'}]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
Has there been any solution to this issue? I'm facing the same problem.
Hey @erikdao can you provide some more details? There are a few problems reflected here.
Today's release also should resolve some of these with #9632 and #9633
@madkinsz My problem is similar to the OP's problem. In my flow, I basically call a my_task.map(inputs)
, and the inputs is an array of size 5000, which essentially creates 5000 task runs. And I also had the error.
sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30.00 (Background on this error at: https://sqlalche.me/e/14/3o7r)
I was running a local Prefect server with PostgreSQL database. I had created a concurrency limit of 80 for the tag associated with my task.
Version: 2.2.2
Python version: 3.10.9
OS/Arch: MacOS Ventura M1
Profile: default
Server:
Database: postgresql
@madkinsz this is the issue i was running into in #8935 w/ the sqlite backend. seems to affect the postgres backend too. I debuged this a bit more by adding DEBUG logging to the server/client. My settings are added w/ a .env file:
PREFECT_API_DATABASE_CONNECTION_URL="postgresql+asyncpg://prefect:prefect@localhost:5432/prefect"
PREFECT_LOGGING_SERVER_LEVEL=DEBUG
PREFECT_LOGGING_LEVEL=DEBUG
PREFECT_LOGGING_INTERNAL_LEVEL=DEBUG
Then I run prefect server start &> server.log
in one tab and then python err.py &> err.log
in another to redirect logging output. The client errors out majorly (18M of logs) the first one is the pool error overflow below and subsequent ones are from 500s coming from the server. Ive put all the files+logs into a folder on dropbox so you can check em out. The Pipfile.lock will nail down all the dependencies i have
https://www.dropbox.com/sh/5whhrsarcce6fe6/AACE7g_LJoB-LIlsWwY_hcYLa?dl=0
18:19:33.500 | ERROR | prefect.server - Encountered exception in request:
Traceback (most recent call last):
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 24, in __call__
await responder(scope, receive, send)
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 44, in __call__
await self.app(scope, receive, self.send_with_gzip)
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
raise exc
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
await self.app(scope, receive, sender)
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/starlette/routing.py", line 718, in __call__
await route.handle(scope, receive, send)
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/starlette/routing.py", line 276, in handle
await self.app(scope, receive, send)
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/starlette/routing.py", line 66, in app
response = await func(request)
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/prefect/server/utilities/server.py", line 103, in handle_response_scoped_depends
response = await default_handler(request)
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/fastapi/routing.py", line 237, in app
raw_response = await run_endpoint_function(
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/fastapi/routing.py", line 163, in run_endpoint_function
return await dependant.call(**values)
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/prefect/server/api/task_runs.py", line 52, in create_task_run
model = await models.task_runs.create_task_run(
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 119, in async_wrapper
return await fn(*args, **kwargs)
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/prefect/server/models/task_runs.py", line 63, in create_task_run
await session.execute(insert_stmt)
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 435, in execute
result = await greenlet_spawn(
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 192, in greenlet_spawn
result = context.throw(*sys.exc_info())
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2232, in execute
return self._execute_internal(
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2117, in _execute_internal
conn = self._connection_for_bind(bind)
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1984, in _connection_for_bind
return trans._connection_for_bind(engine, execution_options)
File "<string>", line 2, in _connection_for_bind
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/sqlalchemy/orm/state_changes.py", line 137, in _go
ret_value = fn(self, *arg, **kw)
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1111, in _connection_for_bind
conn = bind.connect()
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3269, in connect
return self._connection_cls(self)
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 145, in __init__
self._dbapi_connection = engine.raw_connection()
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3293, in raw_connection
return self.pool.connect()
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 452, in connect
return _ConnectionFairy._checkout(self)
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 1268, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 716, in checkout
rec = pool._do_get()
File "/Users/jquick/.local/share/virtualenvs/pre-osrBzrNz/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 157, in _do_get
raise exc.TimeoutError(
sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30.00 (Background on this error at: https://sqlalche.me/e/20/3o7r)
@zanieb @justquick Hitting the same error when trying to executes tasks on 200+ dask workers.
Seems like prefect does not allow to configure pool_size
and max_overflow
as indicated in sqlalchemy error messages page so when the flow request too many new connections, the pool reaches a timeout. One could argue that increasing the pool size will not solve the underlying problem where a DB connection is held for too long.
It seems to be caused by the /api/task_runs
endpoint that holds a connection to the DB for a long period of time. Those QueuePool
timeouts seem to appear less when not looking at the flow run page on the UI (probably a lot of /api/task_runs
queries on that page). But not looking at the UI is unfortunately not enough and we eventually reach an API error in the flow itself:
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url 'http://prefect-server.prefect.svc.cluster.local:4200/api/task_runs/'
Response: {'exception_message': 'Internal Server Error'}
We should add retries for this error like we did in #9632
This is a common but complex race condition. I'm actually running into it when doing an asyncio queue pool for talking to redis. Do you have a solution to address this in the pool settings? Adding retrys would be a bandaid and still might not fix the issue in subsequent runs since it can affect hundreds of coroutines
@justquick Not sure if that helps but we workaround this by creating several prefect servers pods in our k8s cluster instead of a single one. This will increase the number of connection you can have at the same time on your DB. Still a workaround though...
I need to eventually run 10,000 tasks but in setting up prefect for the first time and running just 100 I am also getting both errors mentioned...
sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30.00
File "asyncpg/protocol/protocol.pyx", line 201, in bind_execute
asyncio.exceptions.TimeoutError
I am using postgres and the server I am testing on has 8 CPUs and 16GB of RAM. When it does run fine there is plenty of headroom. I set concurrency limits and worker pool limit to 50. UPDATE: now 100 after fix below.
my code is similar to this flow...
@task
def task1(item: str):
# make api call; save data
return featherfilepath
@task
def task2(item: str):
# process stats; save data
return featherfilepath
@flow(task_runner=ConcurrentTaskRunner()) # flowrun task1
def map_task1():
items = feather_instance.get("items")
items = items['name'].tolist()
task1_multiple = task1.map(item=items)
@flow(task_runner=ConcurrentTaskRunner()) # flowrun task2
def map_task2():
items = feather_instance.get("items")
items = items['name'].tolist()
task2_multiple = task2.map(item=items)
@flow(task_runner=SequentialTaskRunner())
def flowrun():
map_task1()
map_task2()
if __name__ == "__main__":
flowrun()
UPDATE: i just did a few runs in a row without a single error. Not sure if just coincidence or the following "AI recommendations" fixed it or not. Not sure how to test is this config.toml is even doing anything? I also upped limits in postgres. I am no expert though.
$ cat ~/.prefect/config.toml
[server.database]
url = "postgresql://prefect:prefect@localhost/prefect"
pool_size = 80
max_overflow = 90
pool_timeout=240 # 4 minutes
$ nano /etc/postgresql/12/main/postgresql.conf;
max_connections = 1000 # depending on your needs, don't set too high to avoid excessive resource usage
shared_buffers = 4GB # 25% of your system's RAM
effective_cache_size = 8GB # 50% of your system's RAM
work_mem = 16MB # depends on your workload, monitor and adjust accordingly
maintenance_work_mem = 1GB # for maintenance operations like VACUUM, CREATE INDEX, etc.
I was having the same problem. It was confusing because I was not getting any failures on my first several flow runs and then suddenly I couldn't get successfully complete any flow run without this popping up.
I tried all the solutions I could find here and in other threads but the only thing that seems to have fixed my issue for now was simply truncating the task_run table in Postgres. My guess is the task_run table grew so large (it was over 500k records before truncation) that the insert statements for creating a new task became significantly less performant which started tripping the timeout error.
Ultimately, I'm happy I found a fix for this, but I'm not sure what the long term solution would look like. I do see quite a few indexes on that table which I'm sure is contributing to the slow inserts once the table grows large enough, so maybe the ORM could drop them at the start of a flow run and recreate them after all the tasks are finished? I'm curious what other solutions the dev team might have.
Edit: Even my truncate workaround stopped working. I was really hoping to use Prefect's task-level caching and monitoring, but I don't think a local deployment is capable of reliably handling my level of task concurrency. I ended up rebuilding my flow so there is only 1 task responsible for executing all my async functions and I built my own custom caching logic and now my flow is finally able to finish successfully. I'm bummed because for my use-case it seems theres hardly any benefit to using Prefect over just scheduling jobs via crontab.
I've opened this PR which I believe addresses this issue. Would love to see this merged into the main
branch and released.