prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Duplicate pods running same flow run

Open thetarby opened this issue 1 year ago • 4 comments

First check

  • [X] I added a descriptive title to this issue.
  • [X] I used the GitHub search to find a similar issue and didn't find it.
  • [X] I searched the Prefect documentation for this issue.
  • [X] I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

When a flow fails for some reason and has retries enabled in a Kubernetes environment, same flow run is executed in parallel in two different pods when it is rescheduled. When we check the Kubernetes cluster to see what is happening we saw that the pod that first executes the flow and fails is not killed and when it resumes execution to retry, another pod is created and executes the same flow in parallel. This happens so frequently that we had to disable all retries.

Reproduction

@flow(log_prints=True, retries=3, retry_delay_seconds=10)
def fail_after(seconds: int = None):
    seconds = seconds or 5
    time.sleep(seconds)
    raise Exception("failed")

Error

There is no errors but duplicate executions of the flow results in unwanted and unexpected behaviour.

Versions

Version:             2.16.6
API version:         0.8.4
Python version:      3.10.14
Git commit:          3fecd435
Built:               Sat, Mar 23, 2024 4:06 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         server

Additional context

I guess the problem is that if a flow fails Kubernetes pod is not killed but waits until its proposed state is accepted, which is RUNNING. During this time prefect server updates job state to Scheduled(AwaitingRetry). At the same time kubernetes worker polls work pool for scheduled flows and a new pod is created for the same flow run when the time comes.

Kubernetes workers set flow run's state from SCHEDULED to PENDING and then newly created pod tries to set flow run's state to RUNNING. At the same time the pod, which proposed state RUNNING and was sleeping, wakes up and set state from RUNNING to RUNNING.

Here is the if block that does it; https://github.com/PrefectHQ/prefect/blob/6c188abd7846d12984c9c33d56ef41b0c74914f9/src/prefect/engine.py#L955C9-L963C84

if not state.is_final() and not state.is_paused():
            logger.info(
                (
                    f"Received non-final state {state.name!r} when proposing final"
                    f" state {terminal_state.name!r} and will attempt to run again..."
                ),
            )
            # Attempt to enter a running state again
            state = await propose_state(client, Running(), flow_run_id=flow_run.id)

All these transitions are accepted since there is no policies as far as I can see in prefect codebase. Only policy to prevent duplicate executions is when proposing state PENDING, which is the orchestration rule PreventPendingTransitions. If two parallel workers tries to set a flow runs state to PENDING one fails. So Prefect assumes that whenever a flow is executed its state should be set to PENDING first, but in this case one of the pods (the old one) tries to set from SCHEDULED to RUNNING and the other (new one) tries to set to PENDING and then RUNNING as it should be. The assumption fails hence duplicate executions is possible.

I think getting the pod killed when flow fails would be a quick solution to the problem.

thetarby avatar Mar 30 '24 21:03 thetarby

Any thoughts on this? Duplicate executions are really annoying and a deal breaker. This might be prioritized.

thetarby avatar Apr 03 '24 17:04 thetarby

Hey @thetarby. My apologies - this issue seems to have slipped through our triage system. Can I check if adding an idempotency key would help here?

zhen0 avatar Jul 10 '24 16:07 zhen0

Hi @zhen0, I am not sure how can I pass the idempotency key to flow runs. These runs are generated by prefect server and worker itself.

However, I don't think the idempotency key could solve the problem here because the problem is not duplicating flow runs. There is one flow run as it should be but there are two pods executing in parallel the same flow run. For instance if you have a flow that inserts some rows to db, rows are duplicated.

It is very easy to replicate the problem, if you deploy and run the flow belove, after 10 seconds you'll see two pods in running state that printed "will sleep for 100 seconds".

@flow(log_prints=True, retries=4, retry_delay_seconds=10)
def fail_first(seconds: int = None):
    if flow_run.get_run_count() >= 2:
        print("will sleep for 100 seconds")
        time.sleep(100)
        return

    seconds = seconds or 5
    time.sleep(seconds)
    raise Exception("failed")

I have tried and got some screen shots for you.

First I have run the flow from ui; WhatsApp Image 2024-07-11 at 12 20 57 PM

Checked the kubernetes and the first pod is running; WhatsApp Image 2024-07-11 at 12 20 09 PM

Then first pod fails and then retries, at the same time another pod is scheduled for the same flow run and they execute in parallel; WhatsApp Image 2024-07-11 at 12 20 06 PM

logs of the first pod;

09:19:02.049 | INFO    | prefect.flow_runs.runner - Opening process...
/usr/local/lib/python3.10/runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
  warn(RuntimeWarning(msg))
09:19:05.038 | INFO    | Flow run 'cute-mosquito' - Downloading flow code from storage at '.'
09:19:11.475 | ERROR   | Flow run 'cute-mosquito' - Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 867, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/opt/prefect/flows/hello.py", line 72, in fail_first
    raise Exception("failed")
Exception: failed
09:19:11.643 | INFO    | Flow run 'cute-mosquito' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
09:19:21.761 | INFO    | Flow run 'cute-mosquito' - will sleep for 100 seconds

logs of the second pod;

09:19:17.345 | INFO    | prefect.flow_runs.runner - Opening process...
/usr/local/lib/python3.10/runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
  warn(RuntimeWarning(msg))
09:19:20.153 | INFO    | Flow run 'cute-mosquito' - Downloading flow code from storage at '.'
09:19:21.737 | INFO    | Flow run 'cute-mosquito' - will sleep for 100 seconds

As you can see in the logs will sleep for 100 seconds is printed two times for one flow run. So if it was an insert query instead of a print statement that'd be a huge problem.

thetarby avatar Jul 11 '24 09:07 thetarby

Any updates on this? We are facing similar issue on 2.20.9

srsapient avatar Oct 31 '24 14:10 srsapient

Still occurring with Prefect 2.20.14.

NicholasFiorentini avatar Dec 09 '24 15:12 NicholasFiorentini

We have switched to airflow because of this problem. Being unable to use retries made it impossible for us to use prefect. I am really surprised how this issue gets less attention.

thetarby avatar Dec 10 '24 10:12 thetarby

Hey everyone! This issue was fixed in the 3.x line of Prefect with https://github.com/PrefectHQ/prefect/pull/15482, but that change hasn't been ported back to the 2.x line. Since it looks like most of the folks in this thread are using a 2.x version, I'll port the fix back to the 2.x line today.

desertaxle avatar Dec 19 '24 16:12 desertaxle