Deleting a deployment does not delete corresponding scheduled runs
Bug summary
Deleting a deployment does not always delete all the scheduled runs that correspond to this deployment. If runs are not deleted, the system tries to execute them and falls into an infinite loop once the deployment is not found. Problem occurs 1 time in 10 without apparent reason.
Version info (prefect version output)
Version: 3.0.0
API version: 0.8.4
Python version: 3.12.5
Git commit: c40d069d
Built: Tue, Sep 3, 2024 11:13 AM
OS/Arch: linux/aarch64
Profile: ephemeral
Server type: server
Pydantic version: 2.8.2
Additional context
No response
Noticed the same on Prefect 3.0.3.
Version: 3.0.3
API version: 0.8.4
Python version: 3.10.11
Git commit: d9c51bc3
Built: Fri, Sep 20, 2024 8:52 AM
OS/Arch: linux/x86_64
Profile: local
Server type: server
Pydantic version: 2.9.2
Integrations:
prefect-docker: 0.6.1
prefect-aws: 0.5.0
Like you said, it doesn't happen consistently but once so often. In the meantime, is there an easy way to delete scheduled runs with a non-existent deployment?
To give some more context, there are no error logs for the Prefect server and I use Postgres as the database.
Hello! Any news about the matter or additional infos you could give us ? Is this indeed a bug you're aware of or are we missing something on our side?
@0leg-st I'm not 100% sure, since I didn't experiment with it, but I suspect the issue is caused when the new deployment somehow is no longer connected to the old one. In my case it happened because I changed the DockerImage name in the deploy function.
~~So there is one thing that could be happening here - right now the logic only deletes auto-scheduled runs, so any run generated manually would be left hanging. I'll look into whether that's safe to update.~~ (this is incorrect, all scheduled runs are deleted)
The DELETE request is bound to the delete mechanism, so if the request succeeded I don't know why runs would be left hanging otherwise.
We're also seeing this in our Prefect 3 instance, where we delete deployments of a flow, sometimes the corresponding scheduled flow runs are not removed. This has the unfortunate downside of bricking the worker responsible for it, in that:
- We delete a deployment
- This does not always delete the scheduled flow runs attached to it
- These flow runs try to execute
- The worker looks up the deployment, and cannot find it
- The worker begins to loop-crash
- Every flow assigned to that worker gets blocked as the worker is dead
I would strongly recommend making the (process) worker more resilient, so that when it 404's when talking to Prefect to fetch the deployment ID, this causes the flow run to move into a CRASHED state instead of crashing the worker.
In the interim until this issue gets fixed, we have a flow that finds orphaned flow runs and then deletes them. The find orphaned flow run looks like:
@task(retries=2, retry_delay_seconds=10, task_run_name="Find Orphaned Flow Runs")
async def find_orphaned_flows() -> list[UUID]:
async with get_client() as client:
deployments = await client.read_deployments()
deployment_ids = {deployment.id for deployment in deployments}
flows = await client.read_flows()
flow_ids = {flow.id for flow in flows}
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(
type=FlowRunFilterStateType(any_=[StateType.SCHEDULED]),
),
)
)
orphans = [r for r in flow_runs if r.deployment_id not in deployment_ids or r.flow_id not in flow_ids]
return [run.id for run in orphans]
@task(retries=2, retry_delay_seconds=10, task_run_name="Cancel Flow Run {run_id}")
async def delete_flow_run(run_id: UUID):
async with get_client() as client:
logger = get_logger()
logger.info(f"Deleting flow with ID: {run_id}")
await client.delete_flow_run(run_id)
@flow()
async def reaper():
flow_runs_to_delete = await find_orphaned_flows()
delete_flow_run.map(flow_runs_to_delete).wait()
And to highlight that this is not an infrequent issue, here is a screenshot from it running yesterday and deleting 15 orphaned flow runs, each of which would have brought the worker to a standstill.
Information taken from the slack conversation here: https://prefect-community.slack.com/archives/CL09KU1K7/p1740470191308529
Hi @Samreay - this PR should get released this week and will prevent the worker from crash-looping by cancelling runs associated with deleted deployments.
Hi @Samreay - this PR should get released this week and will prevent the worker from crash-looping by cancelling runs associated with deleted deployments.
Hi @cicdw , I noticed the issue continues to happen after this PR.
https://github.com/PrefectHQ/prefect/pull/17313/files
Let me first explain the behavior,
- When workers try to submit a scheduled flow run, if its corresponding deployment somehow got deleted, workers enter the function
_mark_flow_run_as_cancelled
async def _submit_run(self, flow_run: "FlowRun") -> None:
"""
Submits a given flow run for execution by the worker.
"""
run_logger = self.get_flow_run_logger(flow_run)
try:
await self.client.read_deployment(getattr(flow_run, "deployment_id"))
+ except (ObjectNotFound, AttributeError):
self._logger.exception(
f"Deployment {flow_run.deployment_id} no longer exists. "
f"Flow run {flow_run.id} will not be submitted for"
" execution"
)
self._submitting_flow_run_ids.remove(flow_run.id)
+ await self._mark_flow_run_as_cancelled(
+ flow_run,
+ state_updates=dict(
+ message=f"Deployment {flow_run.deployment_id} no longer exists, cancelled run."
+ ),
+ )
return
- The bug continues in function
_mark_flow_run_as_cancelled
async def _mark_flow_run_as_cancelled(
self, flow_run: "FlowRun", state_updates: dict[str, Any] | None = None
) -> None:
state_updates = state_updates or {}
state_updates.setdefault("name", "Cancelled")
state_updates.setdefault("type", StateType.CANCELLED)
if TYPE_CHECKING:
assert flow_run.state
- state = flow_run.state.model_copy(update=state_updates)
await self.client.set_flow_run_state(flow_run.id, state, force=True)
# Do not remove the flow run from the cancelling set immediately because
# the API caches responses for the `read_flow_runs` and we do not want to
# duplicate cancellations.
await self._schedule_task(
60 * 10, self._cancelling_flow_run_ids.remove, flow_run.id
)
Error log:
Traceback (most recent call last):
File "/opt/prefect/current/.venv/lib/python3.11/site-packages/prefect/workers/base.py", line 1004, in _submit_run
await self._mark_flow_run_as_cancelled(
File "/opt/prefect/current/.venv/lib/python3.11/site-packages/prefect/workers/base.py", line 1255, in _mark_flow_run_as_cancelled
state = flow_run.state.model_copy(update=state_updates)
^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'model_copy'
NoneType here means flow_run.state is None. Because the above constant TYPE_CHECKING is currently false, imported from typing.
Suggest to ensure flow_run.state be not null.
#17536
Thank you for debugging this and submitting a PR @lelouvincx! I'm surprised to see that flow_run.state is None there but either way your fix should do the trick!
Good job. so this issue has been fixed, right? And we can close this issue? @lelouvincx
@sivagao yes, we can close this.