airflow
airflow copied to clipboard
openlineage, celery: scheduler hanging when emitting lots of OL events via HTTP
Apache Airflow Provider(s)
celery, openlineage
Versions of Apache Airflow Providers
No response
Apache Airflow version
2.9.0 but happens on 2.7+ too
Operating System
Darwin MacBook-Pro.local 23.4.0 Darwin Kernel Version 23.4.0: Fri Mar 15 00:10:42 PDT 2024; root:xnu-10063.101.17~1/RELEASE_ARM64_T6000 arm64
Deployment
Other
Deployment details
The issue can be reproduced in all environments, both in local with breeze and cloud deployment, e.g. Astro Cloud.
What happened
OpenLineage listener hooks on DagRun state changes via on_dag_run_running/failed/success. When OL events are emitted via HTTP in large scale the scheduler hangs and needs restart. The issue appears to be happening only with CeleryExecutor.
This couldn't be reproduced when disabling OpenLineage (with [openlineage] disabled = True) or with any other OpenLineage transport that doesn't use HTTP. I also experimented with using raw urllib3 or httpx as alternative to requests. All of the experiments produced the same bug resulting in Scheduler hanging.
What you think should happen instead
When reproducing with local breeze setup with CeleryExecutor there’s this strange behaviour:
htop:
lsof | grep CLOSE_WAIT:
Stack from main loop of scheduler:
Traceback for thread 152 (airflow) [] (most recent call last):
(Python) File "/usr/local/bin/airflow", line 8, in <module>
sys.exit(main())
(Python) File "/opt/airflow/airflow/__main__.py", line 58, in main
args.func(args)
(Python) File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
(Python) File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
return f(*args, **kwargs)
(Python) File "/opt/airflow/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
return func(*args, **kwargs)
(Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 58, in scheduler
run_command_with_daemon_option(
(Python) File "/opt/airflow/airflow/cli/commands/daemon_utils.py", line 85, in run_command_with_daemon_option
callback()
(Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 61, in <lambda>
callback=lambda: _run_scheduler_job(args),
(Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 49, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
(Python) File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper
return func(*args, session=session, **kwargs)
(Python) File "/opt/airflow/airflow/jobs/job.py", line 410, in run_job
return execute_job(job, execute_callable=execute_callable)
(Python) File "/opt/airflow/airflow/jobs/job.py", line 439, in execute_job
ret = execute_callable()
(Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 847, in _execute
self._run_scheduler_loop()
(Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 982, in _run_scheduler_loop
self.job.executor.heartbeat()
(Python) File "/opt/airflow/airflow/executors/base_executor.py", line 240, in heartbeat
self.trigger_tasks(open_slots)
(Python) File "/opt/airflow/airflow/executors/base_executor.py", line 298, in trigger_tasks
self._process_tasks(task_tuples)
(Python) File "/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 292, in _process_tasks
key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
(Python) File "/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 342, in _send_tasks_to_celery
key_and_async_results = list(
(Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 484, in _chain_from_iterable_of_lists
for element in iterable:
(Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 619, in result_iterator
yield fs.pop().result()
(Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 439, in result
self._condition.wait(timeout)
(Python) File "/usr/local/lib/python3.8/threading.py", line 302, in wait
waiter.acquire()
Stack from one of the child spawned scheduler processes
Traceback for thread 6363 (airflow) [] (most recent call last):
(Python) File "/usr/local/bin/airflow", line 8, in <module>
sys.exit(main())
(Python) File "/opt/airflow/airflow/__main__.py", line 58, in main
args.func(args)
(Python) File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
(Python) File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
return f(*args, **kwargs)
(Python) File "/opt/airflow/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
return func(*args, **kwargs)
(Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 58, in scheduler
run_command_with_daemon_option(
(Python) File "/opt/airflow/airflow/cli/commands/daemon_utils.py", line 85, in run_command_with_daemon_option
callback()
(Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 61, in <lambda>
callback=lambda: _run_scheduler_job(args),
(Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 49, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
(Python) File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper
return func(*args, session=session, **kwargs)
(Python) File "/opt/airflow/airflow/jobs/job.py", line 410, in run_job
return execute_job(job, execute_callable=execute_callable)
(Python) File "/opt/airflow/airflow/jobs/job.py", line 439, in execute_job
ret = execute_callable()
(Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 847, in _execute
self._run_scheduler_loop()
(Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 982, in _run_scheduler_loop
self.job.executor.heartbeat()
(Python) File "/opt/airflow/airflow/executors/base_executor.py", line 240, in heartbeat
self.trigger_tasks(open_slots)
(Python) File "/opt/airflow/airflow/executors/base_executor.py", line 298, in trigger_tasks
self._process_tasks(task_tuples)
(Python) File "/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 292, in _process_tasks
key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
(Python) File "/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 343, in _send_tasks_to_celery
send_pool.map(send_task_to_executor, task_tuples_to_send, chunksize=chunksize)
(Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 674, in map
results = super().map(partial(_process_chunk, fn),
(Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 608, in map
fs = [self.submit(fn, *args) for args in zip(*iterables)]
(Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 608, in <listcomp>
fs = [self.submit(fn, *args) for args in zip(*iterables)]
(Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 645, in submit
self._start_queue_management_thread()
(Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 584, in _start_queue_management_thread
self._adjust_process_count()
(Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 608, in _adjust_process_count
p.start()
(Python) File "/usr/local/lib/python3.8/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
(Python) File "/usr/local/lib/python3.8/multiprocessing/context.py", line 277, in _Popen
return Popen(process_obj)
(Python) File "/usr/local/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
(Python) File "/usr/local/lib/python3.8/multiprocessing/popen_fork.py", line 75, in _launch
code = process_obj._bootstrap(parent_sentinel=child_r)
(Python) File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
(Python) File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
(Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
(Python) File "/usr/local/lib/python3.8/multiprocessing/queues.py", line 97, in get
res = self._recv_bytes()
(Python) File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
(Python) File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
buf = self._recv(4)
(Python) File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
This points to fact that the bug has probably something to do with file descriptors not being closed properly. I did not follow the whole logic how scheduler spawns child processes but before scheduler getting stuck it spawns some child processes closing properly and not causing the issue.
How to reproduce
- Setup breeze.
- Create
dynamic_dag.jinja2template:
from __future__ import annotations
from pendulum import datetime
from airflow.decorators import dag, task
@task
def return_string(path):
return path
@dag(
start_date=datetime(2023, 1, 1),
max_active_runs={{ max_active_runs }},
schedule=None,
catchup=False,
)
def dynamic_dag_{{ number }}():
return_string("whatever")
dynamic_dag_{{ number }}()
- Create DAGs in
files/dagsAirflow directory, e.g. with following script:
from __future__ import annotations
import pathlib
import jinja2
DAGS_LOCATION = pathlib.Path.home() / "airflow" / "files" / "dags" # change if airflow is another location
TEMPLATE_LOCATION = pathlib.Path(__file__).resolve().parent / "dynamic_dag.jinja2" # this should point to template file
NO_OF_DAGS = 10
MAX_ACTIVE_RUNS = 100
if __name__ == '__main__':
jinja_env = jinja2.Environment()
template = jinja_env.from_string(TEMPLATE_LOCATION.read_text())
for i in range(NO_OF_DAGS):
with open(DAGS_LOCATION / f"dynamic_dag_{i}.py", 'w') as f:
f.write(template.render(number=i, max_active_runs=MAX_ACTIVE_RUNS))
- Create
trigger_daginfiles/dags:
from __future__ import annotations
from datetime import datetime, timedelta
from airflow import DAG
from airflow.api.common.trigger_dag import trigger_dag
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils import timezone
from airflow.utils.types import DagRunType
default_args = {
"owner": "airflow",
"start_date": datetime(2023, 1, 1),
"depends_on_past": False,
}
NO_OF_DAGS = 10
NO_OF_TRIGGERS_PER_DAG = 100
def trigger(**kwargs):
dag_runs = []
def trigger_dags(x):
for i in range(NO_OF_DAGS):
trigger_dag_id = f"dynamic_dag_{i}"
parsed_execution_date = timezone.utcnow()
dag_model = DagModel.get_dagmodel(trigger_dag_id)
if not dag_model:
continue
dag_model.set_is_paused(False)
run_id = DagRun.generate_run_id(DagRunType.MANUAL, parsed_execution_date)
dag_run = trigger_dag(
dag_id=trigger_dag_id,
run_id=run_id + str(x),
execution_date=parsed_execution_date,
replace_microseconds=False,
)
dag_runs.append(dag_run)
for i in range(NO_OF_TRIGGERS_PER_DAG):
trigger_dags(i)
# Main DAG that will trigger DAGs
with DAG(
"trigger_dag",
default_args=default_args,
schedule_interval=timedelta(days=1), # Adjust as needed
catchup=False,
) as dag:
start = EmptyOperator(task_id="start")
trigger_dags = PythonOperator(task_id="trigger_dags", python_callable=trigger)
start >> trigger_dags
-
Host local server (skil if you want to use some other, e.g.
httpbin.org):-
Flask server in
app.pyfrom __future__ import annotations import sys import time from flask import Flask, Response, request app = Flask(__name__) @app.route('/', defaults={'path': ''}) @app.route('/<path:path>', methods=['GET', 'HEAD', 'POST', 'PUT', 'DELETE']) def catch_all(path): # time.sleep(4) # uncomment to delay response if request.is_json: print(request.json, file=sys.stderr) return Response('You want path: %s' % path, status=502) # change the status if needed if __name__ == '__main__': app.run() -
requirements.txt
flask==2.3.2 python-dateutil==2.8.2 gunicorn -
Dockerfile
FROM python:3-alpine AS builder WORKDIR /app RUN python3 -m venv venv ENV VIRTUAL_ENV=/app/venv ENV PATH="$VIRTUAL_ENV/bin:$PATH" COPY requirements.txt . RUN pip install -r requirements.txt # Stage 2 FROM python:3-alpine AS runner WORKDIR /app COPY --from=builder /app/venv venv COPY app.py app.py ENV VIRTUAL_ENV=/app/venv ENV PATH="$VIRTUAL_ENV/bin:$PATH" ENV FLASK_APP=app/app.py EXPOSE 8080 CMD ["gunicorn", "--bind" , ":8080", "--workers", "2", "app:app"] -
Build image with:
docker build . -t test-server -
Run with
docker run -eFLASK_RUN_PORT=5000 -eFLASK_APP=[app.py](http://app.py/) --rm -p 5000:8080 test-server
-
-
In
files/airflow-breeze-config/variables.envput following entries:AIRFLOW__CORE__LAZY_DISCOVER_PROVIDERS=False OPENLINEAGE_URL=http://host.docker.internal:5000 AIRFLOW__CORE__LOGGING_LEVEL=DEBUG # if you want DEBUG logs
example for httpbin.org would be to set below instead of OPENLINEAGE_URL:
AIRFLOW__OPENLINEAGE__TRANSPORT={"type":"http","url":"https://httpbin.org/status/401","endpoint":""}
- Run breeze with
breeze start-airflow --executor CeleryExecutorand triggertrigger_dag. Wait until scheduler hangs and DAGs get stuck in queued/running state.
Anything else
The issue is reproducible with a considerable amount of concurrent DagRuns (100+ in my case, maybe it depends on the setup). I also did some tests with raw threading only and/or skipping OpenLineage dependencies at all (sending requests directly from on_dag_run_... hooks.
I am willing to submit a PR that potentially fixes the bug - the change is to use ProcessPoolExecutor instead of ThreadPoolExecutor. This seems to fix it completely and not break anything else.
I could not find the root cause for it eventually. My guts are saying it has something to do with threading in Python + some Celery dependency on file descriptors that produces the bug.
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
@mobuchowski can you take a look?
Hey @RNHTTR - this is the result of our internal investigation. We think someone knowledgeable with Celery might be able to help - if not, then we'd love to go with #39235 as this solves the issue in our tests.