Deleting a flow or task run should clear any consumed concurrency limits
Opened from the Prefect Public Slack Community
emil.ostergaard: Hello, I have problems with prefect cloud 2.0.
We use kubernetes flow runner, and a dask task runner.
Friday (8/7-2022), I had a flow run which I wanted to abort.
I attempted to use the delete functionality in the UI, thinking it would
delete all resources related to the flow_run, including the kubernetes job etc.
It did not remove the kubernetes job, so I removed this manually.
The issue is concurrency-limits: The tasks launched by this flow has a tag, with a concurrency limit.
It appears the task data associated with the deleted flow run was not removed from prefect storage. For instance, if I try:
prefect concurrency-limit inspect my-tag
It shows a bunch of active task ids, even though nothing is running in k8s.
This causes an unfortunate issue where any new flow runs, for this flow, will never start tasks, because prefect thinks the concurrency-limit is hit, due to these zombie tasks.
However, I can not seem to find a way to manually clean up these task ids, which means this flow is dead. Any help is appreciated!
anna: Deleting a flow run will delete only the flow run, it will not terminate any external resources
Due to a hybrid model, Prefect doesn't have direct access to your infra, which is why terminating resources this way is difficult
Let me open an issue to investigating the best approach for such zombie tasks
<@ULVA73B9P> open "Investigate the right approach for cleaning up zombie task runs caused by an infrastructure crash to free up concurrency limit slots"
Original thread can be found here.
@Oliver_Mannion: Hiya does Prefect 2 have the concept of heartbeats and the Zombie Killer?
@Anna_Geller: Why are you asking? Did you stumble across some issue that made you think about those concepts? Ideally, you shouldn't have to worry about those as this is more of an implementation detail
@Oscar_Björhn: I've thought about it too. Our VM agent host went down once and the flows that were in-progress at the time never seemed to get "cleaned up" in the gui, several days later they still reported that they were in progress.
@Anna_Geller: Thanks Oscar, I believe what you're describing is a related but not entirely the same problem of infrastructure crashes. I'm AFK but I'll set a reminder to open an issue for both on Monday. Thanks a lot to you both
@Oscar_Björhn: Hardly a time-sensitive issue, at least not for me. Otherwise i would have created an issue myself. Appreciated though!
@Anna_Geller: It seems that both use cases are the same: the agent or flow run infrastructure crashes and the flow run and possibly also task runs in progress should be cleaned up (e.g. by setting their state to Crashed/Failed) to give more precise information that the current flow/task runs are in fact no longer Running but Crashed
I think we should probably add hooks to deletion of flow and task runs to free concurrency slots.
@madkinsz In addition, what about a CLI command that clears a specific concurrency limit?
Hitting this issue as well. This practically makes concurrency limits unusable. Please find steps to reproduce below.
- Run the flow below locally
- Quit with ctrl-c
- Running
prefect concurrency-limit inspect stallwill show 5 zombie tasks
Furthermore, if you are logged in to Prefect Cloud and navigate to the "Concurrency Limits" page, you will be able to delete the zombie task runs but the concurrency slots will not be released. Even with an empty set of tasks the active concurrency slot number will remain the same and no new tasks will be scheduled.

import asyncio
from prefect import task, flow, get_client
@task(tags=['stall'])
async def stall(_: int):
while True:
await asyncio.sleep(1)
@flow
async def f():
await stall.map(list(range(100)))
async def set_concurrency_limit():
async with get_client() as client:
await client.create_concurrency_limit(
tag='stall',
concurrency_limit=5
)
if __name__ == '__main__':
asyncio.run(set_concurrency_limit())
asyncio.run(f())
Just wondering if there have been an updates or workarounds posted. We're still manually checking our concurrency limits every day to recreate them if needed due to crashed/zombie tasks filling up the slots.
Is a fix for this feature going to be prioritized any time soon? We have some users with a lot of interest in this feature but it's essentially unusable right now due to crashed/zombie Tasks filling up slots. This is exacerbated by the fact that we currently can't mass cancel Tasks through the UI
Is there a way to delete these zombie task allotments directly from the DB? My current workaround is to double the configured concurrency in order to allow new flows/tasks to run as expected but that's ... not ideal.
Small note here because I don't see it mentioned elsewhere on this issue - it is possible to reset a concurrency limit from both the cli (prefect concurrency-limit reset) and the UI (see screenshot).
I can also recreate when deleting a flow run but can no longer recreate when a task run is deleted. I'd welcome any feedback or an updated MRE if this is still an issue.