airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Root Cause Investigation: Intermittent failures and restarts of the DAG processor when using MySQL

Open wjddn279 opened this issue 2 months ago • 7 comments

Apache Airflow version

3.1.0

If "Other Airflow 2/3 version" selected, which one?

No response

What happened?

I deployed Airflow in a Kubernetes environment and observed that the dag-processor was restarting irregularly. Upon checking the error logs, I found the following issues: (I attach full log files: full_log.txt)

File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/connections.py", line 280, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to server during query')
[SQL: SELECT dag_priority_parsing_request.id, dag_priority_parsing_request.bundle_name, dag_priority_parsing_request.relative_fileloc
FROM dag_priority_parsing_request
WHERE dag_priority_parsing_request.bundle_name IN (%s)]
[parameters: ('dags-folder',)]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
[2025-09-24T17:01:10.882+0900] {settings.py:494} DEBUG - Disposing DB connection pool (PID 7)

What you think should happen instead?

the _get_priority_files() function executes a query, during which the MySQL connection is unexpectedly closed. This raises an exception and causes the dag-processor to exit. I also identified other exceptions. While those do not lead to termination (since they are covered by retry logic and try-catch blocks), they appear to be caused by the same underlying issue: sudden termination of MySQL connections.

By reviewing the queries arriving at MySQL during the error times, I confirmed that the connection was indeed being closed with a Quit command.

Image

How to reproduce

  • deploy airflow 3+
  • backend: mysql

Operating System

k8s

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

  • k8s deployment
  • mysql 8.0+ version
  • official helm chart

Anything else?

Why is the Quit signal being sent?

In conclusion, the root cause is the recreation of the existing engine pool object in the subprocess during a fork, as performed in airflow/settings.py#L426-L436.

  1. The subprocess recreates the pool of the engine when it starts.
  2. The prior pool connection objects copied from the parent process lose their references.
  3. They are garbage-collected, which closes the connections originally established by the parent.
  4. The parent process, unaware that its pool connections have been closed, attempts a query and encounters an error.

To verify this, I added the following code to observe when the initially established connections from the parent process are garbage-collected:

  @event.listens_for(engine, "connect")
  def set_mysql_timezone(dbapi_connection, connection_record)
      log.debug(f"[connect] New DB connection established, id={os.getpid()}")

      weakref.finalize(dbapi_connection,
                       lambda: print(f"{datetime.now().isoformat()} dbapi_connection finalized via weakref in os {os.getpid()}",
                                     ))
      weakref.finalize(connection_record, lambda: print(f"{datetime.now().isoformat()} connection_record finalized via weakref in os {os.getpid()}"))

The following logs were observed. The timestamp of the logs exactly matched the time when the Quit command appeared in the MySQL query log. The fact that they occurred in PID 417 indicates that the copied pool from the parent process was garbage-collected in the child process:

2025-09-22T13:41:30.352393 connection_record finalized via weakref in os 417
2025-09-22T13:41:30.352403 dbapi_connection finalized via weakref in os 417

Why does this issue occur only with MySQL and not with PostgreSQL?

Based on testing, both engines trigger garbage collection in the subprocess under the current code. However, in MySQL’s case, the driver explicitly sends a COM_QUIT command when a connection object is garbage-collected, as shown in mysqlclient/_mysql.c#L2233-L2243

In contrast, PostgreSQL does not appear to close the connection itself—it seems to only close the socket without sending a termination command. (This is the behavior we want.)

Additionally, if the garbage collection threshold is not reached (such as in small-scale DAG parsing scenarios), garbage collection does not occur — and in such cases, the issue does not happen even with MySQL.

Are you willing to submit PR?

  • [x] Yes I am willing to submit a PR!

Code of Conduct

wjddn279 avatar Oct 20 '25 12:10 wjddn279

Related reported Issue: https://github.com/apache/airflow/issues/56497

wjddn279 avatar Oct 20 '25 12:10 wjddn279

fix pr: https://github.com/apache/airflow/pull/56044

wjddn279 avatar Oct 20 '25 12:10 wjddn279

Thanks @wjddn279 for the detailed report. We have been facing similar issue in our environment with around 4 dag processors and 11k dags with each dag processor parsing around 3k dags using MySQL 8.0 . I was initially under the assumption that this was due to high connections and CPU load in MySQL dropping connections but this issue could be the case. Some relevant discussions for mysql that I found after reading this issue.

https://github.com/sqlalchemy/sqlalchemy/issues/7815 https://docs.sqlalchemy.org/en/20/core/pooling.html https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork

tirkarthi avatar Oct 20 '25 14:10 tirkarthi

engine.dispose was run in clean_in_fork in after_in_child . I was wondering if calling def before(): engine.dispose(close=False) in register_at_fork(before=before) would help here since the sqlalchemy documentation recommends calling engine.dispose(close=False) before child process is created and I assume here it's calling engine.dispose after child is created.

https://docs.python.org/3/library/os.html#os.register_at_fork

before is a function called before forking a child process.

after_in_parent is a function called from the parent process after forking a child process.

after_in_child is a function called from the child process.

tirkarthi avatar Oct 20 '25 15:10 tirkarthi

@tirkarthi

I think the current behavior (running clean_in_fork in after_in_child) is correct. The intention behind this behavior is to ensure that when a child process is forked, it detaches from the parent’s pool connections to avoid any unwanted side effects.

The key point is that pool recreating must occur in the child process. If it happens in the parent process before the fork (as in the case of register_at_fork(before=before)), the child process will inherit the copied pool object from the parent, which is not the intended behavior.

wjddn279 avatar Oct 20 '25 23:10 wjddn279

@tirkarthi

I’ve added a test that verifies both the modified logic and the error I described. https://github.com/apache/airflow/pull/56044 It would be great if you could take a look and review it!

wjddn279 avatar Oct 21 '25 00:10 wjddn279

@wjddn279 - BTW, i tried to work without connection pool setting, still happens

AmosG avatar Dec 10 '25 08:12 AmosG