prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Robustness issue in 3.4.1 with how container flows communicate events back to the server

Open jeffcarrico opened this issue 11 months ago • 4 comments

Bug summary

Sometimes (20-30%) this causes the flow to fail to start with a cryptic error, sometimes the flow runs (and the UI shows success) but some final cleanup operations fail (which we can see in the ECS task logs). The later seems benign, but obviously the failure to start is a problem.

The flow is a simple hello world with no dependencies and no input parameters, running from a docker image on ECS. Self hosted prefect server. https://reference.prefect.io/prefect_aws/workers/ecs_worker/

from prefect import flow
from prefect.logging import get_run_logger

@flow
def hello_world():
    logger = get_run_logger()
    logger.info("Hello from ECS!!")

Unfortunately, I have not yet been able to produce a MRE for this, or anything simple to reproduce it. However, I do know that taking the latest HEAD commit (27cccbf654) and reverting 6869a02a04 378fc4f6b7 eliminates the issue. I believe the commit which introduced the problem is 378fc4f6b7.

I debated whether to submit this without the MRE, figured I'd at least get the information out there. Maybe a second look at 378fc4f6b7 turns something up, or another issue brings more information which helps triangulate the problem. Feel free to close if this is not deemed helpful.

Version info

Version:             3.4.1
API version:         0.8.4
Python version:      3.12.10
Git commit:          b47ad8e1
Built:               Thu, May 08, 2025 08:42 PM
OS/Arch:             linux/x86_64
Profile:             ephemeral
Server type:         ephemeral
Pydantic version:    2.11.4
Server:
  Database:          sqlite
  SQLite version:    3.40.1
Integrations:
  prefect-aws:       0.5.10

Additional context

An example error message from the ECS task logs

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|   timestamp   |                                                                                    message                                                                                     |
|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1747197365272 | 04:36:05.267 | INFO    | prefect.flow_runs.runner - Opening process...                                                                                                         |
| 1747197365346 | 04:36:05.344 | INFO    | prefect.flow_runs.runner - Loading flow to check for on_cancellation hooks                                                                            |
| 1747197365415 | 04:36:05.413 | INFO    | prefect.flow_runs.runner - Loading flow to check for on_cancellation hooks                                                                            |
| 1747197365512 | 04:36:05.511 | INFO    | prefect.flow_runs.runner - Loading flow to check for on_cancellation hooks                                                                            |
| 1747197365546 | 04:36:05.545 | INFO    | prefect.flow_runs.runner - Loading flow to check for on_cancellation hooks                                                                            |
| 1747197365636 | 04:36:05.635 | INFO    | prefect.flow_runs.runner - Loading flow to check for on_cancellation hooks                                                                            |
| 1747197365751 | 04:36:05.749 | WARNING | prefect.flow_runs.runner - Runner failed to retrieve flow to execute on_cancellation hooks for flow run UUID('fbc41e6a-adac-41ab-a006-62450513ad79'). |
| 1747197365754 | 04:36:05.752 | WARNING | prefect.flow_runs.runner - Runner failed to retrieve flow to execute on_cancellation hooks for flow run UUID('fbc41e6a-adac-41ab-a006-62450513ad79'). |
| 1747197365755 | 04:36:05.754 | WARNING | prefect.flow_runs.runner - Runner failed to retrieve flow to execute on_cancellation hooks for flow run UUID('fbc41e6a-adac-41ab-a006-62450513ad79'). |
| 1747197365804 |   + Exception Group Traceback (most recent call last):                                                                                                                         |
| 1747197365804 |   |   File "/usr/local/lib/python3.12/site-packages/prefect/cli/_utilities.py", line 44, in wrapper                                                                            |
| 1747197365804 |   |     return fn(*args, **kwargs)                                                                                                                                             |
| 1747197365804 |   |            ^^^^^^^^^^^^^^^^^^^                                                                                                                                             |
| 1747197365804 |   |   File "/usr/local/lib/python3.12/site-packages/prefect/cli/_types.py", line 156, in sync_fn                                                                               |
| 1747197365804 |   |     return asyncio.run(async_fn(*args, **kwargs))                                                                                                                          |
| 1747197365804 |   |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                          |
| 1747197365804 |   |   File "/usr/local/lib/python3.12/asyncio/runners.py", line 195, in run                                                                                                    |
| 1747197365804 |   |     return runner.run(main)                                                                                                                                                |
| 1747197365804 |   |            ^^^^^^^^^^^^^^^^                                                                                                                                                |
| 1747197365804 |   |   File "/usr/local/lib/python3.12/asyncio/runners.py", line 118, in run                                                                                                    |
| 1747197365804 |   |     return self._loop.run_until_complete(task)                                                                                                                             |
| 1747197365804 |   |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                             |
| 1747197365804 |   |   File "/usr/local/lib/python3.12/asyncio/base_events.py", line 691, in run_until_complete                                                                                 |
| 1747197365804 |   |     return future.result()                                                                                                                                                 |
| 1747197365804 |   |            ^^^^^^^^^^^^^^^                                                                                                                                                 |
| 1747197365804 |   |   File "/usr/local/lib/python3.12/site-packages/prefect/cli/flow_run.py", line 375, in execute                                                                             |
| 1747197365804 |   |     await runner.execute_flow_run(id)                                                                                                                                      |
| 1747197365804 |   |   File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 571, in execute_flow_run                                                                   |
| 1747197365804 |   |     async with context:                                                                                                                                                    |
| 1747197365804 |   |                ^^^^^^^                                                                                                                                                     |
| 1747197365804 |   |   File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 1542, in __aexit__                                                                         |
| 1747197365804 |   |     await self._exit_stack.__aexit__(*exc_info)                                                                                                                            |
| 1747197365804 |   |   File "/usr/local/lib/python3.12/contextlib.py", line 754, in __aexit__                                                                                                   |
| 1747197365804 |   |     raise exc_details[1]                                                                                                                                                   |
| 1747197365804 |   |   File "/usr/local/lib/python3.12/contextlib.py", line 737, in __aexit__                                                                                                   |
| 1747197365804 |   |     cb_suppress = await cb(*exc_details)                                                                                                                                   |
| 1747197365804 |   |                   ^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                   |
| 1747197365804 |   |   File "/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__                                                                       |
| 1747197365804 |   |     raise BaseExceptionGroup(                                                                                                                                              |
| 1747197365804 |   | ExceptionGroup: unhandled errors in a TaskGroup (3 sub-exceptions)                                                                                                         |
| 1747197365804 |   +-+---------------- 1 ----------------                                                                                                                                       |
| 1747197365804 |     | Traceback (most recent call last):                                                                                                                                       |
| 1747197365805 |     |   File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 973, in _cancel_run                                                                      |
| 1747197365805 |     |     await self._mark_flow_run_as_cancelled(                                                                                                                              |
| 1747197365805 |     |   File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 1435, in _mark_flow_run_as_cancelled                                                     |
| 1747197365805 |     |     await self._client.set_flow_run_state(flow_run.id, state, force=True)                                                                                                |
| 1747197365805 |     |   File "/usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_flow_runs/client.py", line 804, in set_flow_run_state                                      |
| 1747197365805 |     |     state=state_create.model_dump(mode="json", serialize_as_any=True),                                                                                                   |
| 1747197365805 |     |           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                    |
| 1747197365805 |     |   File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 463, in model_dump                                                                               |
| 1747197365805 |     |     return self.__pydantic_serializer__.to_python(                                                                                                                       |
| 1747197365805 |     |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                       |
| 1747197365805 |     | TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer'                                                                                                 |
| 1747197365805 |     +---------------- 2 ----------------                                                                                                                                       |
| 1747197365805 |     | Traceback (most recent call last):                                                                                                                                       |
| 1747197365805 |     |   File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 973, in _cancel_run                                                                      |
| 1747197365805 |     |     await self._mark_flow_run_as_cancelled(                                                                                                                              |
| 1747197365805 |     |   File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 1435, in _mark_flow_run_as_cancelled                                                     |
| 1747197365805 |     |     await self._client.set_flow_run_state(flow_run.id, state, force=True)                                                                                                |
| 1747197365805 |     |   File "/usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_flow_runs/client.py", line 804, in set_flow_run_state                                      |
| 1747197365805 |     |     state=state_create.model_dump(mode="json", serialize_as_any=True),                                                                                                   |
| 1747197365805 |     |           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                    |
| 1747197365805 |     |   File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 463, in model_dump                                                                               |
| 1747197365805 |     |     return self.__pydantic_serializer__.to_python(                                                                                                                       |
| 1747197365805 |     |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                       |
| 1747197365805 |     | TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer'                                                                                                 |
| 1747197365805 |     +---------------- 3 ----------------                                                                                                                                       |
| 1747197365805 |     | Traceback (most recent call last):                                                                                                                                       |
| 1747197365805 |     |   File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 973, in _cancel_run                                                                      |
| 1747197365805 |     |     await self._mark_flow_run_as_cancelled(                                                                                                                              |
| 1747197365805 |     |   File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 1435, in _mark_flow_run_as_cancelled                                                     |
| 1747197365805 |     |     await self._client.set_flow_run_state(flow_run.id, state, force=True)                                                                                                |
| 1747197365805 |     |   File "/usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_flow_runs/client.py", line 804, in set_flow_run_state                                      |
| 1747197365805 |     |     state=state_create.model_dump(mode="json", serialize_as_any=True),                                                                                                   |
| 1747197365805 |     |           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                    |
| 1747197365805 |     |   File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 463, in model_dump                                                                               |
| 1747197365805 |     |     return self.__pydantic_serializer__.to_python(                                                                                                                       |
| 1747197365805 |     |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                       |
| 1747197365805 |     | TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer'                                                                                                 |
| 1747197365805 |     +------------------------------------                                                                                                                                      |
| 1747197365805 | An exception occurred.                                                                                                                                                         |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

jeffcarrico avatar May 14 '25 20:05 jeffcarrico

hi @jeffcarrico - thanks for the issue! do you know for sure that 2.11.4 is the actual version of pydantic where you're encountering this? I've seen traces like this before on lower pydantic versions

zzstoatzz avatar May 14 '25 20:05 zzstoatzz

@zzstoatzz Yes pydantic is 2.11.4

Two more pieces of information:

  1. The server has PREFECT_RESULTS_PERSIST_BY_DEFAULT=true with a PREFECT_DEFAULT_RESULT_STORAGE_BLOCK configured
  2. This is the log when the flow completes successfully but there are post-flow errors, the AttributeError is what led me to look at #17973 and run the experiment where I reverted it out
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|   timestamp   |                                                                              message                                                                              |
|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1747253859144 | 20:17:39.139 | INFO    | prefect.flow_runs.runner - Opening process...                                                                                            |
| 1747253860463 | 20:17:40.458 | INFO    | Flow run 'slim-aardwolf' -  > Running set_working_directory step...                                                                      |
| 1747253860968 | 20:17:40.966 | INFO    | Flow run 'slim-aardwolf' - Beginning flow run 'slim-aardwolf' for flow 'hello-world'                                                     |
| 1747253860969 | 20:17:40.968 | INFO    | Flow run 'slim-aardwolf' - View at https://my-prefect-server.com/prefect/runs/flow-run/868adaf8-9135-4fab-b310-fc9129622718 |
| 1747253860969 | 20:17:40.968 | INFO    | Flow run 'slim-aardwolf' - Hello from ECS!!                                                                                              |
| 1747253861286 | 20:17:41.284 | INFO    | Flow run 'slim-aardwolf' - Finished in state Completed()                                                                                 |
| 1747253861509 | 20:17:41.508 | INFO    | prefect.flow_runs.runner - Process for flow run 'slim-aardwolf' exited cleanly.                                                          |
| 1747253861570 | 20:17:41.567 | ERROR   | prefect.FlowRunCancellingObserver - Error consuming events                                                                               |
| 1747253861570 | Traceback (most recent call last):                                                                                                                                |
| 1747253861570 |   File "/usr/local/lib/python3.12/site-packages/prefect/runner/_observers.py", line 56, in __aexit__                                                              |
| 1747253861570 |     await self._consumer_task                                                                                                                                     |
| 1747253861570 |   File "/usr/local/lib/python3.12/site-packages/prefect/runner/_observers.py", line 34, in _consume_events                                                        |
| 1747253861570 |     self.on_cancelling(flow_run_id)                                                                                                                               |
| 1747253861570 |   File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 1506, in <lambda>                                                                 |
| 1747253861570 |     on_cancelling=lambda flow_run_id: self._runs_task_group.start_soon(                                                                                           |
| 1747253861570 |                                       ^^^^^^^^^^^^^^^^^^^^^                                                                                                       |
| 1747253861570 | AttributeError: 'Runner' object has no attribute '_runs_task_group'. Did you mean: '_loops_task_group'?                                                           |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

jeffcarrico avatar May 14 '25 20:05 jeffcarrico

I just tested with @flow(persist_result=False) and got the crash on startup scenario I showed the log for in the OP so I don't think results persistence is related

jeffcarrico avatar May 14 '25 20:05 jeffcarrico

We've run into the same issue as well. Doesn't seem to affect all of our flows somehow.

On Pydantic version 2.10.4

bkkkk avatar Jun 01 '25 02:06 bkkkk

We are still getting this issue, even with 2.11.4? Sometimes we lose a dozen flow runs this way. Any ideas of what we could try next?

bkkkk avatar Jun 24 '25 03:06 bkkkk