airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Triggerer error "deque mutated during iteration"

Open zhenghanyang opened this issue 1 year ago • 2 comments

Apache Airflow version

2.8.1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

When start more than 20 thousands trigger jobs, the triggerer may report following error and shutdown.

[2024-01-28T21:14:50.981+0800] {triggerer_job_runner.py:481} INFO - 20373 triggers currently running
[2024-01-28T21:14:51.451+0800] {triggerer_job_runner.py:576} INFO - Triggerer's async thread was blocked for 0.23 seconds, likely by a badly-written trigger. Set PYTHONASYNCIODEBUG=1 to get more information on overrunning coroutines.
[2024-01-28T21:14:53.754+0800] {triggerer_job_runner.py:598} INFO - trigger test_trigger_job_03/scheduled__2024-01-28T12:10:00+00:00/wait_for_file_zhy685/-1/1 (ID 20384) starting
[2024-01-28T21:14:53.754+0800] {triggerer_job_runner.py:598} INFO - trigger test_trigger_job_03/scheduled__2024-01-28T12:10:00+00:00/wait_for_file_zhy662/-1/1 (ID 20385) starting

[2024-01-28T21:14:53.773+0800] {triggerer_job_runner.py:341} ERROR - Exception when executing TriggererJobRunner._run_trigger_loop
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py", line 339, in _execute
    self._run_trigger_loop()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py", line 362, in _run_trigger_loop
    self.load_triggers()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py", line 377, in load_triggers
    self.trigger_runner.update_triggers(set(ids))
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py", line 650, in update_triggers
    running_trigger_ids.union(x[0] for x in self.events)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py", line 652, in <genexpr>
    .union(x[0] for x in self.to_create)
RuntimeError: deque mutated during iteration
[2024-01-28T21:14:53.773+0800] {triggerer_job_runner.py:344} INFO - Waiting for triggers to clean up

What you think should happen instead?

Error message shows that to_create deque has been mutated while iterating. The code in error message is in triggerer main thread _run_trigger_loop(), main thread iterate to_create queue and add new trigger to queue, that works fine. But, there is another thread TriggerRunner, which create trigger task from the same queue. Also find that the queue iterate is add by https://github.com/apache/airflow/commit/16b8c476518ed76e3689966ec4b0b788be935410#diff-a5a8f5ab18cc034fa7a64181443415a1cd86aca1ce43a3aaa39e73262941118b

How to reproduce

Just run large numbers of trigger job.

Operating System

Redhat 8.9

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

Maybe we can iterate a copy of queue?

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

zhenghanyang avatar Jan 28 '24 14:01 zhenghanyang

Is this all running on a single triggerer? If so, would you be able to split the load across multiple triggerers?

RNHTTR avatar Jan 28 '24 15:01 RNHTTR

Yes, on a single triggerer. We separate to three triggerers now, also add a cyclic job to start it if any triggerer shutdown. In addition, I have a bit confusion, why the issue occurs only if there is large numbers of trigger job. There are two threads in triggerer, one iterate the queue, another one mutate the queue, so the issue should be happened easily?

zhenghanyang avatar Jan 29 '24 11:01 zhenghanyang