Schedular going down for 1-2 minute on every 10 minute as increase completed pods in EKS
Apache Airflow version
2.2.4 (latest released)
What happened
Hi Team, I am using airflow 2.2.4 and deployed it on aws eks cluster. I noticed that every 5-10 minute schedular down message seeing on airflow UI. When I checked airflow schedular log, seeing the lot of below statements.
[2022-03-21 08:21:21,640] {kubernetes_executor.py:729} INFO - Attempting to adopt pod sampletask.05b6f567b4a64bd5beb16e526ba94d7a
This above statement will print for all completed pod which exist in eks, But it is repeating multiple time and as also invoking the PATCH api.
As per my understanding what happing is, below code pulling all the completed pod details for every time from EKS cluster and invoking the patch API on completed pod. So this activity for 1000 completed POD finishing in 1 minute, for 7000 completed POD its taking 3-5 minute, thats the reason scheduler is going down
What you think should happen instead
This schedular will be healthy when we set "delete_worker_pods = True". but when set delete_worker_pods =False and completed pod count goes to 7000 to 10,000 The scheduler should goes down.
The scheduler should be healthy irrespective of how many completed pod exist in EKS cluster.
How to reproduce
Deploy airflow in k8s cluster and set "delete_worker_pods = False". once completed pod reaches 7,000 to 10,000, you will able to see this issue.
Operating System
OS:Debian GNU/Linux, VERSION: 10
Versions of Apache Airflow Providers
No response
Deployment
Other Docker-based deployment
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template!
cc: @dstandish -> what we talked about :)
This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.
This issue has been closed because it has not received response from the issue author.
We are seeing this issue in the airflow version 2.3.3. I strongly believe the issue is there in the latest airflow version 2.9.1 as well as per the latest code. I don't see any improvements in watcher performance between 2.3.3 and 2.9.1. The primary reason for this issue is due to the Kubernetes pod watcher is not fast enough to cope with the Kubernetes events rate. This leads to Kubernetes watcher failure/restart and adopt_complete_pods take over the completed pods. The adopt_complete_pods will take a couple of minutes, causing the scheduler delayed heartbeat, and then scheduler liveness failures, and then scheduler pod restart.
I have had no luck reproducing this cause my cluster gets destroyed when I get to 2000/3000 tasks by OOM. Pruned my system but still run into this ORM.
Edit: I have seen the restarting
[2024-05-29T06:28:38.937+0000] {process_utils.py:132} INFO - Sending Signals.SIGTERM to group 97. PIDs of all processes in the group: [7350, 7363, 97]
[2024-05-29T06:28:38.939+0000] {process_utils.py:87} INFO - Sending the signal Signals.SIGTERM to group 97
[2024-05-29T06:28:42.204+0000] {settings.py:425} DEBUG - Disposing DB connection pool (PID 7350)
[2024-05-29T06:28:42.212+0000] {settings.py:425} DEBUG - Disposing DB connection pool (PID 7363)
[2024-05-29T06:28:42.217+0000] {process_utils.py:263} INFO - Waiting up to 5 seconds for processes to exit...
[2024-05-29T06:28:42.237+0000] {process_utils.py:80} INFO - Process psutil.Process(pid=7363, status='terminated', started='06:28:38') (7363) terminated with exit code None
[2024-05-29T06:28:42.237+0000] {process_utils.py:80} INFO - Process psutil.Process(pid=97, status='terminated', exitcode=0, started='06:18:49') (97) terminated with exit code 0
[2024-05-29T06:28:42.237+0000] {process_utils.py:80} INFO - Process psutil.Process(pid=7350, status='terminated', started='06:28:36') (7350) terminated with exit code None
[2024-05-29T06:28:42.238+0000] {kubernetes_executor.py:740} INFO - Shutting down Kubernetes executor
[2024-05-29T06:28:42.238+0000] {kubernetes_executor.py:742} DEBUG - Flushing task_queue...
[2024-05-29T06:28:42.238+0000] {scheduler_job_runner.py:880} ERROR - Exception when executing Executor.end on KubernetesExecutor(parallelism=32)
Traceback (most recent call last):
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 856, in _execute
self._run_scheduler_loop()
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 989, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 1071, in _do_scheduling
callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
File "/opt/airflow/airflow/utils/retries.py", line 89, in wrapped_function
for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs):
File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 435, in __iter__
do = self.iter(retry_state=retry_state)
File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 368, in iter
result = action(retry_state)
File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 390, in <lambda>
self._add_action_func(lambda rs: rs.outcome.result())
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in result
return self.__get_result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/opt/airflow/airflow/utils/retries.py", line 98, in wrapped_function
return func(*args, **kwargs)
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 1415, in _schedule_all_dag_runs
callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 1415, in <listcomp>
callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 1433, in _schedule_dag_run
dag_model = DM.get_dagmodel(dag_run.dag_id, session)
File "/opt/airflow/airflow/utils/session.py", line 81, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/airflow/models/dag.py", line 3737, in get_dagmodel
return session.get(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2853, in get
return self._get_impl(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2975, in _get_impl
return db_load_fn(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity
session.execute(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1720, in execute
result = compile_state_cls.orm_setup_cursor_result(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/context.py", line 349, in orm_setup_cursor_result
return loading.instances(result, querycontext)
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py", line 69, in instances
*[
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py", line 70, in <listcomp>
query_entity.row_processor(context, cursor)
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/context.py", line 2631, in row_processor
_instance = loading._instance_processor(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py", line 796, in _instance_processor
prop.create_row_processor(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/interfaces.py", line 658, in create_row_processor
strat.create_row_processor(
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 263, in _exit_gracefully
sys.exit(os.EX_OK)
SystemExit: 0
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 878, in _execute
executor.end()
File "/opt/airflow/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 743, in end
self._flush_task_queue()
File "/opt/airflow/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 699, in _flush_task_queue
self.log.debug("Executor shutting down, task_queue approximate size=%d", self.task_queue.qsize())
File "<string>", line 2, in qsize
File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 834, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 411, in _send_bytes
self._send(header + buf)
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
[2024-05-29T06:28:42.247+0000] {process_utils.py:132} INFO - Sending Signals.SIGTERM to group 97. PIDs of all processes in the group: []
[2024-05-29T06:28:42.247+0000] {process_utils.py:87} INFO - Sending the signal Signals.SIGTERM to group 97
[2024-05-29T06:28:42.247+0000] {process_utils.py:101} INFO - Sending the signal Signals.SIGTERM to process 97 as process group is missing.
[2024-05-29T06:28:42.247+0000] {scheduler_job_runner.py:886} INFO - Exited execute loop
[2024-05-29T06:28:42.258+0000] {cli_action_loggers.py:94} DEBUG - Calling callbacks: []
[2024-05-29T06:28:42.260+0000] {settings.py:425} DEBUG - Disposing DB connection pool (PID 11)
Liveness: exec [sh -c CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \ │
│ airflow jobs check --job-type SchedulerJob --local │
│ ] delay=10s timeout=20s period=60s #success=1 #failure=5 │
│ Startup: exec [sh -c CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \ │
│ airflow jobs check --job-type SchedulerJob --local │
│ ] delay=0s timeout=20s period=10s #success=1 #failure=6
Events: │
│ Type Reason Age From Message │
│ ---- ------ ---- ---- ------- │
│ Warning Unhealthy 73s (x90 over 7h19m) kubelet Liveness probe failed: command "sh -c CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL= │
│ ERROR exec /entrypoint \\\nairflow jobs check --job-type SchedulerJob --local\n" timed out │
│
@dviru , why would you set the pods not to be deleted? This leads to OOM because the pods occupy some space. Just trying to understand your needs and see if we should also have another config to check maximum number of completed pods that should be allowed to be in the deployment
This also seems related: https://github.com/apache/airflow/issues/38968
@dviru , why would you set the pods not to be deleted? This leads to OOM because the pods occupy some space. Just trying to understand your needs and see if we should also have another config to check maximum number of completed pods that should be allowed to be in the deployment
@ephraimbuddy Some time I want pod should be exist to check the pod logs. But I am not seeing this issue in. 2.7.3 even though we have 6000-7000 completed pods in cluster.
@dviru , why would you set the pods not to be deleted? This leads to OOM because the pods occupy some space. Just trying to understand your needs and see if we should also have another config to check maximum number of completed pods that should be allowed to be in the deployment
@ephraimbuddy Some time I want pod should be exist to check the pod logs. But I am not seeing this issue in. 2.7.3 even though we have 6000-7000 completed pods in cluster.
This problem can be solved by using remote logging. It's not right to keep 7000 completed pods in your cluster.