prefect
prefect copied to clipboard
Ensure infrastructure is always killed when job has crashed
First check
- [X] I added a descriptive title to this issue.
- [X] I used the GitHub search to find a similar issue and didn't find it.
- [X] I searched the Prefect documentation for this issue.
- [X] I checked that this issue is related to Prefect and not one of its dependencies.
Bug summary
I am basing this on the experience of working with the KubernetesWorker from prefect_kubernetes.
The current implementation of the BaseWorker (self._submit_run_and_capture_errors) will watch the infrastructure and relay back the events and possible status-code. If the result from self.run contains an exit-code that is non-zero, the job/flow is marked as "Crashed". However, in the code it is never ensured that the infrastructure is actually killed.
The problem here is that some workers implement a timeout that will return non-zero exit code if the job has taken longer than x seconds. (for kubernetes this is job_watch_timeout_seconds
).
I would expect that the worker would also kill the infrastructure to ensure that jobs are not running forever. However, this is not the case.
Proposal: Change function: https://github.com/PrefectHQ/prefect/blob/7c37407543c17b62756d46501f00fdeb9469951a/src/prefect/workers/base.py#L894-L950
to
async def _submit_run_and_capture_errors(
self, flow_run: "FlowRun", task_status: anyio.abc.TaskStatus = None
) -> Union[BaseWorkerResult, Exception]:
.... # code until check of status_code
if result.status_code != 0:
try:
await self.kill_infrastructure(
infrastructure_pid=flow_run.infrastructure_pid,
configuration=configuration,
)
except NotImplementedError:
self._logger.error(
f"Worker type {self.type!r} does not support killing created "
"infrastructure. Cleanup cannot be guaranteed."
)
await self._propose_crashed_state(
flow_run,
(
"Flow run infrastructure exited with non-zero status code"
f" {result.status_code}."
),
)
self._emit_flow_run_executed_event(result, configuration, submitted_event)
return result
Reproduction
from time import sleep
from prefect import flow
@flow
def my_flow():
sleep(100000000000000)
# create a deployment using kubernetes infrastrucute and set the job_watch_timeout_seconds one minute or alike.
# The job will be marked as crashed after 1 minute but the kubernetes pod will still be running until the sleep statement is finished.
Error
None
Versions
Any
Additional context
Willing to make PR