Cancelling a task run with `.set_task_run_state`
We have a use case where we want to cancel a flow run of a flow/deployment before the next flow run starts. This isn't supported AFAIK in Prefect, so we've rolled our own code.
All of our tasks wrap ShellOperation and spin up a docker container.
I can cancel a flow run via set_flow_run_state and it works great. However, cancelling a flow run apparently doesn't also cancel its task runs.
Thus, I've added set_task_run_state to our script to cancel the task runs within the flow run as well. Although this changes the state of the task run in the Prefect UI, our docker containers attached to those tasks are still running, and don't appear to be cleaned up with some period of waiting.
For cancelling tasks I've tried setting state=Cancelling() and state =Cancelled() in set_task_run_state, but both still don't seem to stop their docker containers.
Our docker containers do appear to appropriately be killed as flows/tasks complete/etc. with scheduled runs or interaction with the Prefect UI, but with using the python API it doesn't work the same. Perhaps there's something behind the scenes going on in the Prefect UI that makes sure to kill any processes attached to task runs that has to be done manually if using set_flow_run_state?
possibly related https://github.com/PrefectHQ/prefect/issues/7753
Expectation / Proposal
First, I'm curious to know where the problem lies. Perhaps it's the prefect library. Perhaps it's in prefect-shell. Perhaps we have to manually control docker containers linked to tasks started by ShellOperation if we're managing task states manually? If it's unlikely to be an issue here, i'll close this.
Second, what I would expect is that when I set a task state to Cancelling or Cancelled that the linked docker container is also shut down/killed .
Third, a bonus would be that if I cancel a flow run its task runs would also be cancelled AND the linked docker containers to those tasks.
Traceback / Example
Here's the function that we use to cancel a single task run.
async def cancel_task_run(task_run):
"""Cancel a task run
:param task_run: (TaskRun) An object of class `TaskRun`
"""
async with get_client() as client:
res = await client.set_task_run_state(
task_run_id=str(task_run.id), state=Cancelled(), force=True
)
return res
- [x] I would like to help contribute a pull request to resolve this!
I'm interested in helping, but not sure this is a prefect-shell issue or not
I'm having same issue. I’m cancelling a flow using the following code
flow_runs = await client.read_flow_runs(flow_run_filter=FlowRunFilter(id={"any_": [flow_run_id]}), limit=1)
state_updates = {}
state_updates.setdefault("name", "Cancelled")
state_updates.setdefault("type", StateType.CANCELLED)
state = flow_runs[0].state.copy(update=state_updates)
await client.set_flow_run_state(flow_run_id=flow_run_id, state=state, force=True)
I’ve noticed although the flow run state is set correctly, the task runs are not cancelled, they are actually still executed. I've also tried
tr_state_updates = {}
tr_state_updates.setdefault("name", "Cancelled")
tr_state_updates.setdefault("type", StateType.CANCELLED)
state = flow_runs[0].state.copy(update=tr_state_updates)
await client.set_task_run_state(task_run_id=task_run.id, state=state, force=True)
after cancelled the flow run with the previous code but task runs are still executed. The agent type I'm using is Process.
I encountered the same issue. How can this be resolved?
same issue? any solutions