prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Ensure infrastructure is always killed when job has crashed

Open marcm-ml opened this issue 9 months ago • 0 comments

First check

  • [X] I added a descriptive title to this issue.
  • [X] I used the GitHub search to find a similar issue and didn't find it.
  • [X] I searched the Prefect documentation for this issue.
  • [X] I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

I am basing this on the experience of working with the KubernetesWorker from prefect_kubernetes.

The current implementation of the BaseWorker (self._submit_run_and_capture_errors) will watch the infrastructure and relay back the events and possible status-code. If the result from self.run contains an exit-code that is non-zero, the job/flow is marked as "Crashed". However, in the code it is never ensured that the infrastructure is actually killed.

The problem here is that some workers implement a timeout that will return non-zero exit code if the job has taken longer than x seconds. (for kubernetes this is job_watch_timeout_seconds).

I would expect that the worker would also kill the infrastructure to ensure that jobs are not running forever. However, this is not the case.

Proposal: Change function: https://github.com/PrefectHQ/prefect/blob/7c37407543c17b62756d46501f00fdeb9469951a/src/prefect/workers/base.py#L894-L950

to

    async def _submit_run_and_capture_errors(
        self, flow_run: "FlowRun", task_status: anyio.abc.TaskStatus = None
    ) -> Union[BaseWorkerResult, Exception]:
        .... # code until check of status_code
        if result.status_code != 0:
            try:
                await self.kill_infrastructure(
                    infrastructure_pid=flow_run.infrastructure_pid,
                    configuration=configuration,
                )
            except NotImplementedError:
                self._logger.error(
                    f"Worker type {self.type!r} does not support killing created "
                    "infrastructure. Cleanup cannot be guaranteed."
                )
            await self._propose_crashed_state(
                flow_run,
                (
                    "Flow run infrastructure exited with non-zero status code"
                    f" {result.status_code}."
                ),
            )

        self._emit_flow_run_executed_event(result, configuration, submitted_event)

        return result

Reproduction

from time import sleep
from prefect import flow

@flow
def my_flow():
    sleep(100000000000000)


# create a deployment using kubernetes infrastrucute and set the job_watch_timeout_seconds one minute or alike.
# The job will be marked as crashed after 1 minute but the kubernetes pod will still be running until the sleep statement is finished.

Error

None

Versions

Any

Additional context

Willing to make PR

marcm-ml avatar May 22 '24 15:05 marcm-ml