Duplicate pods running same flow run
First check
- [X] I added a descriptive title to this issue.
- [X] I used the GitHub search to find a similar issue and didn't find it.
- [X] I searched the Prefect documentation for this issue.
- [X] I checked that this issue is related to Prefect and not one of its dependencies.
Bug summary
When a flow fails for some reason and has retries enabled in a Kubernetes environment, same flow run is executed in parallel in two different pods when it is rescheduled. When we check the Kubernetes cluster to see what is happening we saw that the pod that first executes the flow and fails is not killed and when it resumes execution to retry, another pod is created and executes the same flow in parallel. This happens so frequently that we had to disable all retries.
Reproduction
@flow(log_prints=True, retries=3, retry_delay_seconds=10)
def fail_after(seconds: int = None):
seconds = seconds or 5
time.sleep(seconds)
raise Exception("failed")
Error
There is no errors but duplicate executions of the flow results in unwanted and unexpected behaviour.
Versions
Version: 2.16.6
API version: 0.8.4
Python version: 3.10.14
Git commit: 3fecd435
Built: Sat, Mar 23, 2024 4:06 PM
OS/Arch: linux/x86_64
Profile: default
Server type: server
Additional context
I guess the problem is that if a flow fails Kubernetes pod is not killed but waits until its proposed state is accepted, which is RUNNING. During this time prefect server updates job state to Scheduled(AwaitingRetry). At the same time kubernetes worker polls work pool for scheduled flows and a new pod is created for the same flow run when the time comes.
Kubernetes workers set flow run's state from SCHEDULED to PENDING and then newly created pod tries to set flow run's state to RUNNING. At the same time the pod, which proposed state RUNNING and was sleeping, wakes up and set state from RUNNING to RUNNING.
Here is the if block that does it; https://github.com/PrefectHQ/prefect/blob/6c188abd7846d12984c9c33d56ef41b0c74914f9/src/prefect/engine.py#L955C9-L963C84
if not state.is_final() and not state.is_paused():
logger.info(
(
f"Received non-final state {state.name!r} when proposing final"
f" state {terminal_state.name!r} and will attempt to run again..."
),
)
# Attempt to enter a running state again
state = await propose_state(client, Running(), flow_run_id=flow_run.id)
All these transitions are accepted since there is no policies as far as I can see in prefect codebase. Only policy to prevent duplicate executions is when proposing state PENDING, which is the orchestration rule PreventPendingTransitions. If two parallel workers tries to set a flow runs state to PENDING one fails. So Prefect assumes that whenever a flow is executed its state should be set to PENDING first, but in this case one of the pods (the old one) tries to set from SCHEDULED to RUNNING and the other (new one) tries to set to PENDING and then RUNNING as it should be. The assumption fails hence duplicate executions is possible.
I think getting the pod killed when flow fails would be a quick solution to the problem.
Any thoughts on this? Duplicate executions are really annoying and a deal breaker. This might be prioritized.
Hey @thetarby. My apologies - this issue seems to have slipped through our triage system. Can I check if adding an idempotency key would help here?
Hi @zhen0, I am not sure how can I pass the idempotency key to flow runs. These runs are generated by prefect server and worker itself.
However, I don't think the idempotency key could solve the problem here because the problem is not duplicating flow runs. There is one flow run as it should be but there are two pods executing in parallel the same flow run. For instance if you have a flow that inserts some rows to db, rows are duplicated.
It is very easy to replicate the problem, if you deploy and run the flow belove, after 10 seconds you'll see two pods in running state that printed "will sleep for 100 seconds".
@flow(log_prints=True, retries=4, retry_delay_seconds=10)
def fail_first(seconds: int = None):
if flow_run.get_run_count() >= 2:
print("will sleep for 100 seconds")
time.sleep(100)
return
seconds = seconds or 5
time.sleep(seconds)
raise Exception("failed")
I have tried and got some screen shots for you.
First I have run the flow from ui;
Checked the kubernetes and the first pod is running;
Then first pod fails and then retries, at the same time another pod is scheduled for the same flow run and they execute in parallel;
logs of the first pod;
09:19:02.049 | INFO | prefect.flow_runs.runner - Opening process...
/usr/local/lib/python3.10/runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
warn(RuntimeWarning(msg))
09:19:05.038 | INFO | Flow run 'cute-mosquito' - Downloading flow code from storage at '.'
09:19:11.475 | ERROR | Flow run 'cute-mosquito' - Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 867, in orchestrate_flow_run
result = await flow_call.aresult()
File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
return await asyncio.wrap_future(self.future)
File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "/opt/prefect/flows/hello.py", line 72, in fail_first
raise Exception("failed")
Exception: failed
09:19:11.643 | INFO | Flow run 'cute-mosquito' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
09:19:21.761 | INFO | Flow run 'cute-mosquito' - will sleep for 100 seconds
logs of the second pod;
09:19:17.345 | INFO | prefect.flow_runs.runner - Opening process...
/usr/local/lib/python3.10/runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
warn(RuntimeWarning(msg))
09:19:20.153 | INFO | Flow run 'cute-mosquito' - Downloading flow code from storage at '.'
09:19:21.737 | INFO | Flow run 'cute-mosquito' - will sleep for 100 seconds
As you can see in the logs will sleep for 100 seconds is printed two times for one flow run. So if it was an insert query instead of a print statement that'd be a huge problem.
Any updates on this? We are facing similar issue on 2.20.9
Still occurring with Prefect 2.20.14.
We have switched to airflow because of this problem. Being unable to use retries made it impossible for us to use prefect. I am really surprised how this issue gets less attention.
Hey everyone! This issue was fixed in the 3.x line of Prefect with https://github.com/PrefectHQ/prefect/pull/15482, but that change hasn't been ported back to the 2.x line. Since it looks like most of the folks in this thread are using a 2.x version, I'll port the fix back to the 2.x line today.