prefect icon indicating copy to clipboard operation
prefect copied to clipboard

When a process worker goes down, mark running tasks/flows as "Crashed"

Open leesavoie-voltaiq opened this issue 11 months ago • 10 comments

Describe the current behavior

When a process worker is shut down, any flows it was running at the time stay in the "Running" state indefinitely. If the work pool has a concurrency limit, that limit is effectively reduced by the number of hanging flows.

To reproduce:

  1. Install docker and docker compose
  2. Copy the files below into the same directory
  3. Run docker compose up. If this fails, it is probably because the database didn't have enough time to initialize. Simply wait a few seconds and run docker compose up again.
  4. Wait for a flow run to be triggered and for the worker to start it.
  5. While the flow is still running, hit Ctrl-C to stop the containers.
  6. Run docker compose up again and wait for everything to initialize.
  7. View the Prefect server UI at http://localhost:4200. You will see a "Running" task that never ends. Over time, new scheduled flows will be added but they will never run because the work pool has a concurrency limit of 1.

Files used (put these in the same directory):

docker-compose.yml:

version: '3.7'
services:
    prefect_test_db:
        image: postgres:17.0-alpine
        volumes:
            - prefect_test_db_data:/var/lib/postgresql/data
        environment:
            - POSTGRES_NAME=postgres
            - POSTGRES_USER=postgres
            - POSTGRES_PASSWORD=postgres
            - POSTGRES_DB=prefect

    prefect_test_server:
        image: prefecthq/prefect:3-python3.9
        environment:
            - PREFECT_HOME=/data
            - PREFECT_API_DATABASE_CONNECTION_URL=postgresql+asyncpg://postgres:postgres@prefect_test_db/prefect
        command: prefect server start --host 0.0.0.0
        volumes:
            - prefect_test_data:/data
        ports:
            - 4200:4200
        depends_on:
            - prefect_test_db

    prefect_test_worker:
        image: prefecthq/prefect:3-python3.9
        environment:
            - PREFECT_API_URL=http://prefect_test_server:4200/api
            - PREFECT_LOGGING_LOG_PRINTS=true
        command: >
            sh -c "sleep 5 &&
                   prefect work-pool create test_pool --type process
                   prefect work-pool set-concurrency-limit test_pool 1 &&
                   prefect deploy --prefect-file /data/prefect.yaml &&
                   prefect worker start --pool test_pool --type process"
        volumes:
            - .:/data
        depends_on:
            - prefect_test_server

volumes:
    prefect_test_db_data:
    prefect_test_data:

prefect.yaml:

# Welcome to your prefect.yaml file! You can use this file for storing and managing
# configuration for deploying your flows. We recommend committing this file to source
# control along with your flow code.

# Generic metadata about this project
name: prefect_deployments
prefect-version: 3.1.12

# build section allows you to manage and build docker images
build: null

# push section allows you to manage if and how this project is uploaded to remote locations
push: null

# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect.deployments.steps.set_working_directory:
    directory: /data

# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: "test_deployment"
  version: "1"
  tags: []
  description: "Run the test flow"
  schedule:
    interval: 20
  flow_name: "test_flow"
  entrypoint: /data/flows.py:test_flow
  parameters: {}
  work_pool:
    name: "test_pool"
    work_queue_name: "test_pool"
    job_variables: {}

flows.py:

from prefect import flow
from time import sleep


@flow
def test_flow():
    print("Running flow")
    sleep(15)

Describe the proposed behavior

When a process worker crashes or is shut down gracefully, mark any flows it was running as "Crashed".

Example Use

This is a common problem during development when using docker compose to bring up/shut down a development stack with long running processes. Anything that is still running at the time docker compose down is executed will hang indefinitely.

Additional context

Some context from @cicdw on Slack:

Our current recommendation for these situations is to setup a zombie-killer automation as described here: https://docs.prefect.io/v3/automate/events/automations-triggers#detect-and-respond-to-zombie-flows It was actually an intentional design decision to not couple submitted work to worker health as a form of fault tolerance, but honestly this decision doesn't make much sense for the Process Worker specifically because the work is in fact coupled; I think we can look to add an attempt to cancel subprocesses when that particular worker shuts down gracefully.

leesavoie-voltaiq avatar Jan 16 '25 18:01 leesavoie-voltaiq

@leesavoie-voltaiq I'm having trouble reproducing - if I kill my worker with CTRL+C, I get the same Crashed state as if I killed a served flow with CTRL+C; what signal is being sent to kill the worker in your case / any other information that might help me reproduce?

cicdw avatar Jan 23 '25 17:01 cicdw

Maybe this only applies to workers that go down cleanly. My process is something like this:

  1. Run docker compose up
  2. Wait for a flow run to start
  3. Hit Ctrl+C in the same terminal that I ran docker compose up (thus ending the docker compose session)

I think the difference may be that I'm bringing down the worker cleanly and you are killing it.

I'm running this on an M2 Mac, if that makes a difference.

leesavoie-voltaiq avatar Jan 23 '25 17:01 leesavoie-voltaiq

I am having a similar problem. I am also using a worker pool of the type "process". My expectation is that if a worker fails while running a flow, the execution would be picked up by another available worker. I find jarring that this is not the default behavior.

anibalrivero avatar Feb 17 '25 10:02 anibalrivero

I have also encountered this issue. While debugging, I found that the behavior is caused by anyio._backends._asyncio.Process(anyio version 4.9.0). The key part of the code is:

async def aclose(self) -> None:
    with CancelScope(shield=True) as scope:
        if self._stdin:
            await self._stdin.aclose()
        if self._stdout:
            await self._stdout.aclose()
        if self._stderr:
            await self._stderr.aclose()

        scope.shield = False
        try:
            await self.wait()
        except BaseException:  # This captures CancelledError, causing the subprocess to be killed immediately
            scope.shield = True
            self.kill()
            await self.wait()
            raise

Because CancelledError is caught here, the subprocess is immediately terminated, preventing it from performing any exit cleanup.

I tried modifying this class by removing the scope.shield assignments, and with that change, the process was able to exit cleanly. I'm not sure if this is an issue with anyio, but I hope this information helps.

wangxing-git avatar Apr 02 '25 08:04 wangxing-git

@wangxing-git that is very helpful thank you!

cicdw avatar Apr 03 '25 20:04 cicdw

@wangxing-git would you mind testing against this branch and letting me know if it resolves the issue for you?

Also, if anyone else could test against this branch it would be appreciated!

cicdw avatar Apr 03 '25 21:04 cicdw

@wangxing-git would you mind testing against this branch and letting me know if it resolves the issue for you?

Also, if anyone else could test against this branch it would be appreciated!

The branch does not resolve the issue for me. Below is a custom Process implementation I used to work around the problem:

class CustomProcess(Process):
    async def aclose(self) -> None:
        with CancelScope(shield=True) as scope:
            if self._stdin:
                await self._stdin.aclose()
            if self._stdout:
                await self._stdout.aclose()
            if self._stderr:
                await self._stderr.aclose()

            try:
                await self.wait()
            except BaseException:
                self.kill()
                await self.wait()
                raise

I removed the scope.shield = False line because it causes an error during await self.wait(), likely due to the cancel scope no longer being shielded, which leads to a CancelledError and the process being killed prematurely.

wangxing-git avatar Apr 07 '25 01:04 wangxing-git

@cicdw Additionally, modifying the prefect.utilities.processutils.open_process function can also allow the subprocess to exit properly. Here's the code I used:

@asynccontextmanager
async def open_process(
    command: list[str], **kwargs: Any
) -> AsyncGenerator[anyio.abc.Process, Any]:
    """
    Like `anyio.open_process` but with:
    - Support for Windows command joining
    - Termination of the process on exception during yield
    - Forced cleanup of process resources during cancellation
    """
    # Passing a string to open_process is equivalent to shell=True which is
    # generally necessary for Unix-like commands on Windows but otherwise should
    # be avoided
    if not TYPE_CHECKING:
        if not isinstance(command, list):
            raise TypeError(
                "The command passed to open process must be a list. You passed the command"
                f"'{command}', which is type '{type(command)}'."
            )

    if sys.platform == "win32":
        command = " ".join(command)
        process = await _open_anyio_process(command, **kwargs)
    else:
        process = await anyio.open_process(command, **kwargs)

    # if there's a creationflags kwarg and it contains CREATE_NEW_PROCESS_GROUP,
    # use SetConsoleCtrlHandler to handle CTRL-C
    win32_process_group = False
    if (
        sys.platform == "win32"
        and "creationflags" in kwargs
        and kwargs["creationflags"] & subprocess.CREATE_NEW_PROCESS_GROUP
    ):
        win32_process_group = True
        _windows_process_group_pids.add(process.pid)
        # Add a handler for CTRL-C. Re-adding the handler is safe as Windows
        # will not add a duplicate handler if _win32_ctrl_handler is
        # already registered.
        windll.kernel32.SetConsoleCtrlHandler(_win32_ctrl_handler, 1)

    try:
        # 1:
        # async with process:
        #     yield process
        yield process
    finally:
        # 2:
        # Ensure the process resource is closed. If not shielded from cancellation,
        # this resource can be left open and the subprocess output can appear after
        # the parent process has exited.
        with anyio.CancelScope(shield=True):
            await process.aclose()

        try:
            process.terminate()
            if sys.platform == "win32" and win32_process_group:
                _windows_process_group_pids.remove(process.pid)

        except OSError:
            # Occurs if the process is already terminated
            pass

        # 3:
        # # Ensure the process resource is closed. If not shielded from cancellation,
        # # this resource can be left open and the subprocess output can appear after
        # # the parent process has exited.
        # with anyio.CancelScope(shield=True):
        #     await process.aclose()

This includes three modifications that help ensure the subprocess exits gracefully, especially under cancellation scenarios.

wangxing-git avatar Apr 07 '25 07:04 wangxing-git

Hi!

I am having similar issues.

I have an AWS EKS setup, the workers are type process (not kubernetes), so a worker spawns subproceses to execute flows.

Everytime the worker pod receives a SIGTERM, the process does not exits gracefully. It immediately stops, leaving the current execution flows as running forever.

I can see in logs:

Received SIGTERM. Sending SIGINT to the process worker (PID 7)...
Received SIGINT. Sending SIGINT to the process worker (PID 7)...   

But right after this, the process exits and no running tasks are gracefully stopped.

I am investigating this for two days, reading issues, asking IA, prefect slack community and I don't find clear instructions how to fix or tackle the issue.

Have you done any findings?

sluceno avatar Nov 15 '25 09:11 sluceno

@sluceno I don't have a solution for this. It should be possible to enable heartbeats from the flow runs and then have a periodic task that finds flow runs that haven't sent heartbeats recently and remove them. You would have to account for other cases, though, like flow runs that haven't started yet and thus haven't sent any heartbeats.

leesavoie-voltaiq avatar Nov 21 '25 17:11 leesavoie-voltaiq