When a process worker goes down, mark running tasks/flows as "Crashed"
Describe the current behavior
When a process worker is shut down, any flows it was running at the time stay in the "Running" state indefinitely. If the work pool has a concurrency limit, that limit is effectively reduced by the number of hanging flows.
To reproduce:
- Install docker and docker compose
- Copy the files below into the same directory
- Run
docker compose up. If this fails, it is probably because the database didn't have enough time to initialize. Simply wait a few seconds and rundocker compose upagain. - Wait for a flow run to be triggered and for the worker to start it.
- While the flow is still running, hit
Ctrl-Cto stop the containers. - Run
docker compose upagain and wait for everything to initialize. - View the Prefect server UI at http://localhost:4200. You will see a "Running" task that never ends. Over time, new scheduled flows will be added but they will never run because the work pool has a concurrency limit of 1.
Files used (put these in the same directory):
docker-compose.yml:
version: '3.7'
services:
prefect_test_db:
image: postgres:17.0-alpine
volumes:
- prefect_test_db_data:/var/lib/postgresql/data
environment:
- POSTGRES_NAME=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=prefect
prefect_test_server:
image: prefecthq/prefect:3-python3.9
environment:
- PREFECT_HOME=/data
- PREFECT_API_DATABASE_CONNECTION_URL=postgresql+asyncpg://postgres:postgres@prefect_test_db/prefect
command: prefect server start --host 0.0.0.0
volumes:
- prefect_test_data:/data
ports:
- 4200:4200
depends_on:
- prefect_test_db
prefect_test_worker:
image: prefecthq/prefect:3-python3.9
environment:
- PREFECT_API_URL=http://prefect_test_server:4200/api
- PREFECT_LOGGING_LOG_PRINTS=true
command: >
sh -c "sleep 5 &&
prefect work-pool create test_pool --type process
prefect work-pool set-concurrency-limit test_pool 1 &&
prefect deploy --prefect-file /data/prefect.yaml &&
prefect worker start --pool test_pool --type process"
volumes:
- .:/data
depends_on:
- prefect_test_server
volumes:
prefect_test_db_data:
prefect_test_data:
prefect.yaml:
# Welcome to your prefect.yaml file! You can use this file for storing and managing
# configuration for deploying your flows. We recommend committing this file to source
# control along with your flow code.
# Generic metadata about this project
name: prefect_deployments
prefect-version: 3.1.12
# build section allows you to manage and build docker images
build: null
# push section allows you to manage if and how this project is uploaded to remote locations
push: null
# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect.deployments.steps.set_working_directory:
directory: /data
# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: "test_deployment"
version: "1"
tags: []
description: "Run the test flow"
schedule:
interval: 20
flow_name: "test_flow"
entrypoint: /data/flows.py:test_flow
parameters: {}
work_pool:
name: "test_pool"
work_queue_name: "test_pool"
job_variables: {}
flows.py:
from prefect import flow
from time import sleep
@flow
def test_flow():
print("Running flow")
sleep(15)
Describe the proposed behavior
When a process worker crashes or is shut down gracefully, mark any flows it was running as "Crashed".
Example Use
This is a common problem during development when using docker compose to bring up/shut down a development stack with long running processes. Anything that is still running at the time docker compose down is executed will hang indefinitely.
Additional context
Some context from @cicdw on Slack:
Our current recommendation for these situations is to setup a zombie-killer automation as described here: https://docs.prefect.io/v3/automate/events/automations-triggers#detect-and-respond-to-zombie-flows It was actually an intentional design decision to not couple submitted work to worker health as a form of fault tolerance, but honestly this decision doesn't make much sense for the Process Worker specifically because the work is in fact coupled; I think we can look to add an attempt to cancel subprocesses when that particular worker shuts down gracefully.
@leesavoie-voltaiq I'm having trouble reproducing - if I kill my worker with CTRL+C, I get the same Crashed state as if I killed a served flow with CTRL+C; what signal is being sent to kill the worker in your case / any other information that might help me reproduce?
Maybe this only applies to workers that go down cleanly. My process is something like this:
- Run
docker compose up - Wait for a flow run to start
- Hit Ctrl+C in the same terminal that I ran
docker compose up(thus ending the docker compose session)
I think the difference may be that I'm bringing down the worker cleanly and you are killing it.
I'm running this on an M2 Mac, if that makes a difference.
I am having a similar problem. I am also using a worker pool of the type "process". My expectation is that if a worker fails while running a flow, the execution would be picked up by another available worker. I find jarring that this is not the default behavior.
I have also encountered this issue. While debugging, I found that the behavior is caused by anyio._backends._asyncio.Process(anyio version 4.9.0). The key part of the code is:
async def aclose(self) -> None:
with CancelScope(shield=True) as scope:
if self._stdin:
await self._stdin.aclose()
if self._stdout:
await self._stdout.aclose()
if self._stderr:
await self._stderr.aclose()
scope.shield = False
try:
await self.wait()
except BaseException: # This captures CancelledError, causing the subprocess to be killed immediately
scope.shield = True
self.kill()
await self.wait()
raise
Because CancelledError is caught here, the subprocess is immediately terminated, preventing it from performing any exit cleanup.
I tried modifying this class by removing the scope.shield assignments, and with that change, the process was able to exit cleanly. I'm not sure if this is an issue with anyio, but I hope this information helps.
@wangxing-git that is very helpful thank you!
@wangxing-git would you mind testing against this branch and letting me know if it resolves the issue for you?
Also, if anyone else could test against this branch it would be appreciated!
@wangxing-git would you mind testing against this branch and letting me know if it resolves the issue for you?
Also, if anyone else could test against this branch it would be appreciated!
The branch does not resolve the issue for me. Below is a custom Process implementation I used to work around the problem:
class CustomProcess(Process):
async def aclose(self) -> None:
with CancelScope(shield=True) as scope:
if self._stdin:
await self._stdin.aclose()
if self._stdout:
await self._stdout.aclose()
if self._stderr:
await self._stderr.aclose()
try:
await self.wait()
except BaseException:
self.kill()
await self.wait()
raise
I removed the scope.shield = False line because it causes an error during await self.wait(), likely due to the cancel scope no longer being shielded, which leads to a CancelledError and the process being killed prematurely.
@cicdw Additionally, modifying the prefect.utilities.processutils.open_process function can also allow the subprocess to exit properly. Here's the code I used:
@asynccontextmanager
async def open_process(
command: list[str], **kwargs: Any
) -> AsyncGenerator[anyio.abc.Process, Any]:
"""
Like `anyio.open_process` but with:
- Support for Windows command joining
- Termination of the process on exception during yield
- Forced cleanup of process resources during cancellation
"""
# Passing a string to open_process is equivalent to shell=True which is
# generally necessary for Unix-like commands on Windows but otherwise should
# be avoided
if not TYPE_CHECKING:
if not isinstance(command, list):
raise TypeError(
"The command passed to open process must be a list. You passed the command"
f"'{command}', which is type '{type(command)}'."
)
if sys.platform == "win32":
command = " ".join(command)
process = await _open_anyio_process(command, **kwargs)
else:
process = await anyio.open_process(command, **kwargs)
# if there's a creationflags kwarg and it contains CREATE_NEW_PROCESS_GROUP,
# use SetConsoleCtrlHandler to handle CTRL-C
win32_process_group = False
if (
sys.platform == "win32"
and "creationflags" in kwargs
and kwargs["creationflags"] & subprocess.CREATE_NEW_PROCESS_GROUP
):
win32_process_group = True
_windows_process_group_pids.add(process.pid)
# Add a handler for CTRL-C. Re-adding the handler is safe as Windows
# will not add a duplicate handler if _win32_ctrl_handler is
# already registered.
windll.kernel32.SetConsoleCtrlHandler(_win32_ctrl_handler, 1)
try:
# 1:
# async with process:
# yield process
yield process
finally:
# 2:
# Ensure the process resource is closed. If not shielded from cancellation,
# this resource can be left open and the subprocess output can appear after
# the parent process has exited.
with anyio.CancelScope(shield=True):
await process.aclose()
try:
process.terminate()
if sys.platform == "win32" and win32_process_group:
_windows_process_group_pids.remove(process.pid)
except OSError:
# Occurs if the process is already terminated
pass
# 3:
# # Ensure the process resource is closed. If not shielded from cancellation,
# # this resource can be left open and the subprocess output can appear after
# # the parent process has exited.
# with anyio.CancelScope(shield=True):
# await process.aclose()
This includes three modifications that help ensure the subprocess exits gracefully, especially under cancellation scenarios.
Hi!
I am having similar issues.
I have an AWS EKS setup, the workers are type process (not kubernetes), so a worker spawns subproceses to execute flows.
Everytime the worker pod receives a SIGTERM, the process does not exits gracefully. It immediately stops, leaving the current execution flows as running forever.
I can see in logs:
Received SIGTERM. Sending SIGINT to the process worker (PID 7)...
Received SIGINT. Sending SIGINT to the process worker (PID 7)...
But right after this, the process exits and no running tasks are gracefully stopped.
I am investigating this for two days, reading issues, asking IA, prefect slack community and I don't find clear instructions how to fix or tackle the issue.
Have you done any findings?
@sluceno I don't have a solution for this. It should be possible to enable heartbeats from the flow runs and then have a periodic task that finds flow runs that haven't sent heartbeats recently and remove them. You would have to account for other cases, though, like flow runs that haven't started yet and thus haven't sent any heartbeats.