Robustness issue in 3.4.1 with how container flows communicate events back to the server
Bug summary
Sometimes (20-30%) this causes the flow to fail to start with a cryptic error, sometimes the flow runs (and the UI shows success) but some final cleanup operations fail (which we can see in the ECS task logs). The later seems benign, but obviously the failure to start is a problem.
The flow is a simple hello world with no dependencies and no input parameters, running from a docker image on ECS. Self hosted prefect server. https://reference.prefect.io/prefect_aws/workers/ecs_worker/
from prefect import flow
from prefect.logging import get_run_logger
@flow
def hello_world():
logger = get_run_logger()
logger.info("Hello from ECS!!")
Unfortunately, I have not yet been able to produce a MRE for this, or anything simple to reproduce it. However, I do know that taking the latest HEAD commit (27cccbf654) and reverting 6869a02a04 378fc4f6b7 eliminates the issue. I believe the commit which introduced the problem is 378fc4f6b7.
I debated whether to submit this without the MRE, figured I'd at least get the information out there. Maybe a second look at 378fc4f6b7 turns something up, or another issue brings more information which helps triangulate the problem. Feel free to close if this is not deemed helpful.
Version info
Version: 3.4.1
API version: 0.8.4
Python version: 3.12.10
Git commit: b47ad8e1
Built: Thu, May 08, 2025 08:42 PM
OS/Arch: linux/x86_64
Profile: ephemeral
Server type: ephemeral
Pydantic version: 2.11.4
Server:
Database: sqlite
SQLite version: 3.40.1
Integrations:
prefect-aws: 0.5.10
Additional context
An example error message from the ECS task logs
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| timestamp | message |
|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1747197365272 | 04:36:05.267 | INFO | prefect.flow_runs.runner - Opening process... |
| 1747197365346 | 04:36:05.344 | INFO | prefect.flow_runs.runner - Loading flow to check for on_cancellation hooks |
| 1747197365415 | 04:36:05.413 | INFO | prefect.flow_runs.runner - Loading flow to check for on_cancellation hooks |
| 1747197365512 | 04:36:05.511 | INFO | prefect.flow_runs.runner - Loading flow to check for on_cancellation hooks |
| 1747197365546 | 04:36:05.545 | INFO | prefect.flow_runs.runner - Loading flow to check for on_cancellation hooks |
| 1747197365636 | 04:36:05.635 | INFO | prefect.flow_runs.runner - Loading flow to check for on_cancellation hooks |
| 1747197365751 | 04:36:05.749 | WARNING | prefect.flow_runs.runner - Runner failed to retrieve flow to execute on_cancellation hooks for flow run UUID('fbc41e6a-adac-41ab-a006-62450513ad79'). |
| 1747197365754 | 04:36:05.752 | WARNING | prefect.flow_runs.runner - Runner failed to retrieve flow to execute on_cancellation hooks for flow run UUID('fbc41e6a-adac-41ab-a006-62450513ad79'). |
| 1747197365755 | 04:36:05.754 | WARNING | prefect.flow_runs.runner - Runner failed to retrieve flow to execute on_cancellation hooks for flow run UUID('fbc41e6a-adac-41ab-a006-62450513ad79'). |
| 1747197365804 | + Exception Group Traceback (most recent call last): |
| 1747197365804 | | File "/usr/local/lib/python3.12/site-packages/prefect/cli/_utilities.py", line 44, in wrapper |
| 1747197365804 | | return fn(*args, **kwargs) |
| 1747197365804 | | ^^^^^^^^^^^^^^^^^^^ |
| 1747197365804 | | File "/usr/local/lib/python3.12/site-packages/prefect/cli/_types.py", line 156, in sync_fn |
| 1747197365804 | | return asyncio.run(async_fn(*args, **kwargs)) |
| 1747197365804 | | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |
| 1747197365804 | | File "/usr/local/lib/python3.12/asyncio/runners.py", line 195, in run |
| 1747197365804 | | return runner.run(main) |
| 1747197365804 | | ^^^^^^^^^^^^^^^^ |
| 1747197365804 | | File "/usr/local/lib/python3.12/asyncio/runners.py", line 118, in run |
| 1747197365804 | | return self._loop.run_until_complete(task) |
| 1747197365804 | | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |
| 1747197365804 | | File "/usr/local/lib/python3.12/asyncio/base_events.py", line 691, in run_until_complete |
| 1747197365804 | | return future.result() |
| 1747197365804 | | ^^^^^^^^^^^^^^^ |
| 1747197365804 | | File "/usr/local/lib/python3.12/site-packages/prefect/cli/flow_run.py", line 375, in execute |
| 1747197365804 | | await runner.execute_flow_run(id) |
| 1747197365804 | | File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 571, in execute_flow_run |
| 1747197365804 | | async with context: |
| 1747197365804 | | ^^^^^^^ |
| 1747197365804 | | File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 1542, in __aexit__ |
| 1747197365804 | | await self._exit_stack.__aexit__(*exc_info) |
| 1747197365804 | | File "/usr/local/lib/python3.12/contextlib.py", line 754, in __aexit__ |
| 1747197365804 | | raise exc_details[1] |
| 1747197365804 | | File "/usr/local/lib/python3.12/contextlib.py", line 737, in __aexit__ |
| 1747197365804 | | cb_suppress = await cb(*exc_details) |
| 1747197365804 | | ^^^^^^^^^^^^^^^^^^^^^^ |
| 1747197365804 | | File "/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__ |
| 1747197365804 | | raise BaseExceptionGroup( |
| 1747197365804 | | ExceptionGroup: unhandled errors in a TaskGroup (3 sub-exceptions) |
| 1747197365804 | +-+---------------- 1 ---------------- |
| 1747197365804 | | Traceback (most recent call last): |
| 1747197365805 | | File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 973, in _cancel_run |
| 1747197365805 | | await self._mark_flow_run_as_cancelled( |
| 1747197365805 | | File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 1435, in _mark_flow_run_as_cancelled |
| 1747197365805 | | await self._client.set_flow_run_state(flow_run.id, state, force=True) |
| 1747197365805 | | File "/usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_flow_runs/client.py", line 804, in set_flow_run_state |
| 1747197365805 | | state=state_create.model_dump(mode="json", serialize_as_any=True), |
| 1747197365805 | | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |
| 1747197365805 | | File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 463, in model_dump |
| 1747197365805 | | return self.__pydantic_serializer__.to_python( |
| 1747197365805 | | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |
| 1747197365805 | | TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer' |
| 1747197365805 | +---------------- 2 ---------------- |
| 1747197365805 | | Traceback (most recent call last): |
| 1747197365805 | | File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 973, in _cancel_run |
| 1747197365805 | | await self._mark_flow_run_as_cancelled( |
| 1747197365805 | | File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 1435, in _mark_flow_run_as_cancelled |
| 1747197365805 | | await self._client.set_flow_run_state(flow_run.id, state, force=True) |
| 1747197365805 | | File "/usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_flow_runs/client.py", line 804, in set_flow_run_state |
| 1747197365805 | | state=state_create.model_dump(mode="json", serialize_as_any=True), |
| 1747197365805 | | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |
| 1747197365805 | | File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 463, in model_dump |
| 1747197365805 | | return self.__pydantic_serializer__.to_python( |
| 1747197365805 | | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |
| 1747197365805 | | TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer' |
| 1747197365805 | +---------------- 3 ---------------- |
| 1747197365805 | | Traceback (most recent call last): |
| 1747197365805 | | File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 973, in _cancel_run |
| 1747197365805 | | await self._mark_flow_run_as_cancelled( |
| 1747197365805 | | File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 1435, in _mark_flow_run_as_cancelled |
| 1747197365805 | | await self._client.set_flow_run_state(flow_run.id, state, force=True) |
| 1747197365805 | | File "/usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_flow_runs/client.py", line 804, in set_flow_run_state |
| 1747197365805 | | state=state_create.model_dump(mode="json", serialize_as_any=True), |
| 1747197365805 | | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |
| 1747197365805 | | File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 463, in model_dump |
| 1747197365805 | | return self.__pydantic_serializer__.to_python( |
| 1747197365805 | | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |
| 1747197365805 | | TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer' |
| 1747197365805 | +------------------------------------ |
| 1747197365805 | An exception occurred. |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
hi @jeffcarrico - thanks for the issue! do you know for sure that 2.11.4 is the actual version of pydantic where you're encountering this? I've seen traces like this before on lower pydantic versions
@zzstoatzz Yes pydantic is 2.11.4
Two more pieces of information:
- The server has PREFECT_RESULTS_PERSIST_BY_DEFAULT=true with a PREFECT_DEFAULT_RESULT_STORAGE_BLOCK configured
- This is the log when the flow completes successfully but there are post-flow errors, the AttributeError is what led me to look at #17973 and run the experiment where I reverted it out
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| timestamp | message |
|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1747253859144 | 20:17:39.139 | INFO | prefect.flow_runs.runner - Opening process... |
| 1747253860463 | 20:17:40.458 | INFO | Flow run 'slim-aardwolf' - > Running set_working_directory step... |
| 1747253860968 | 20:17:40.966 | INFO | Flow run 'slim-aardwolf' - Beginning flow run 'slim-aardwolf' for flow 'hello-world' |
| 1747253860969 | 20:17:40.968 | INFO | Flow run 'slim-aardwolf' - View at https://my-prefect-server.com/prefect/runs/flow-run/868adaf8-9135-4fab-b310-fc9129622718 |
| 1747253860969 | 20:17:40.968 | INFO | Flow run 'slim-aardwolf' - Hello from ECS!! |
| 1747253861286 | 20:17:41.284 | INFO | Flow run 'slim-aardwolf' - Finished in state Completed() |
| 1747253861509 | 20:17:41.508 | INFO | prefect.flow_runs.runner - Process for flow run 'slim-aardwolf' exited cleanly. |
| 1747253861570 | 20:17:41.567 | ERROR | prefect.FlowRunCancellingObserver - Error consuming events |
| 1747253861570 | Traceback (most recent call last): |
| 1747253861570 | File "/usr/local/lib/python3.12/site-packages/prefect/runner/_observers.py", line 56, in __aexit__ |
| 1747253861570 | await self._consumer_task |
| 1747253861570 | File "/usr/local/lib/python3.12/site-packages/prefect/runner/_observers.py", line 34, in _consume_events |
| 1747253861570 | self.on_cancelling(flow_run_id) |
| 1747253861570 | File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 1506, in <lambda> |
| 1747253861570 | on_cancelling=lambda flow_run_id: self._runs_task_group.start_soon( |
| 1747253861570 | ^^^^^^^^^^^^^^^^^^^^^ |
| 1747253861570 | AttributeError: 'Runner' object has no attribute '_runs_task_group'. Did you mean: '_loops_task_group'? |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
I just tested with @flow(persist_result=False) and got the crash on startup scenario I showed the log for in the OP so I don't think results persistence is related
We've run into the same issue as well. Doesn't seem to affect all of our flows somehow.
On Pydantic version 2.10.4
We are still getting this issue, even with 2.11.4? Sometimes we lose a dozen flow runs this way. Any ideas of what we could try next?