airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Schedular going down for 1-2 minute on every 10 minute as increase completed pods in EKS

Open dviru opened this issue 3 years ago • 5 comments

Apache Airflow version

2.2.4 (latest released)

What happened

Hi Team, I am using airflow 2.2.4 and deployed it on aws eks cluster. I noticed that every 5-10 minute schedular down message seeing on airflow UI. When I checked airflow schedular log, seeing the lot of below statements.

[2022-03-21 08:21:21,640] {kubernetes_executor.py:729} INFO - Attempting to adopt pod sampletask.05b6f567b4a64bd5beb16e526ba94d7a

This above statement will print for all completed pod which exist in eks, But it is repeating multiple time and as also invoking the PATCH api.

As per my understanding what happing is, below code pulling all the completed pod details for every time from EKS cluster and invoking the patch API on completed pod. So this activity for 1000 completed POD finishing in 1 minute, for 7000 completed POD its taking 3-5 minute, thats the reason scheduler is going down

160352813-9ff57de3-782f-4cee-8f7c-f6d5b8a60d29

What you think should happen instead

This schedular will be healthy when we set "delete_worker_pods = True". but when set delete_worker_pods =False and completed pod count goes to 7000 to 10,000 The scheduler should goes down.

The scheduler should be healthy irrespective of how many completed pod exist in EKS cluster.

How to reproduce

Deploy airflow in k8s cluster and set "delete_worker_pods = False". once completed pod reaches 7,000 to 10,000, you will able to see this issue.

Operating System

OS:Debian GNU/Linux, VERSION: 10

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

dviru avatar Mar 30 '22 03:03 dviru

Thanks for opening your first issue here! Be sure to follow the issue template!

boring-cyborg[bot] avatar Mar 30 '22 03:03 boring-cyborg[bot]

cc: @dstandish -> what we talked about :)

potiuk avatar Mar 30 '22 10:03 potiuk

This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.

github-actions[bot] avatar Jul 05 '23 07:07 github-actions[bot]

This issue has been closed because it has not received response from the issue author.

github-actions[bot] avatar Aug 04 '23 18:08 github-actions[bot]

We are seeing this issue in the airflow version 2.3.3. I strongly believe the issue is there in the latest airflow version 2.9.1 as well as per the latest code. I don't see any improvements in watcher performance between 2.3.3 and 2.9.1. The primary reason for this issue is due to the Kubernetes pod watcher is not fast enough to cope with the Kubernetes events rate. This leads to Kubernetes watcher failure/restart and adopt_complete_pods take over the completed pods. The adopt_complete_pods will take a couple of minutes, causing the scheduler delayed heartbeat, and then scheduler liveness failures, and then scheduler pod restart.

dirrao avatar May 02 '24 09:05 dirrao

I have had no luck reproducing this cause my cluster gets destroyed when I get to 2000/3000 tasks by OOM. Pruned my system but still run into this ORM.

Edit: I have seen the restarting

ephraimbuddy avatar May 29 '24 05:05 ephraimbuddy

[2024-05-29T06:28:38.937+0000] {process_utils.py:132} INFO - Sending Signals.SIGTERM to group 97. PIDs of all processes in the group: [7350, 7363, 97]
[2024-05-29T06:28:38.939+0000] {process_utils.py:87} INFO - Sending the signal Signals.SIGTERM to group 97
[2024-05-29T06:28:42.204+0000] {settings.py:425} DEBUG - Disposing DB connection pool (PID 7350)
[2024-05-29T06:28:42.212+0000] {settings.py:425} DEBUG - Disposing DB connection pool (PID 7363)
[2024-05-29T06:28:42.217+0000] {process_utils.py:263} INFO - Waiting up to 5 seconds for processes to exit...
[2024-05-29T06:28:42.237+0000] {process_utils.py:80} INFO - Process psutil.Process(pid=7363, status='terminated', started='06:28:38') (7363) terminated with exit code None
[2024-05-29T06:28:42.237+0000] {process_utils.py:80} INFO - Process psutil.Process(pid=97, status='terminated', exitcode=0, started='06:18:49') (97) terminated with exit code 0
[2024-05-29T06:28:42.237+0000] {process_utils.py:80} INFO - Process psutil.Process(pid=7350, status='terminated', started='06:28:36') (7350) terminated with exit code None
[2024-05-29T06:28:42.238+0000] {kubernetes_executor.py:740} INFO - Shutting down Kubernetes executor
[2024-05-29T06:28:42.238+0000] {kubernetes_executor.py:742} DEBUG - Flushing task_queue...
[2024-05-29T06:28:42.238+0000] {scheduler_job_runner.py:880} ERROR - Exception when executing Executor.end on KubernetesExecutor(parallelism=32)
Traceback (most recent call last):
  File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 856, in _execute
    self._run_scheduler_loop()
  File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 989, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 1071, in _do_scheduling
    callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
  File "/opt/airflow/airflow/utils/retries.py", line 89, in wrapped_function
    for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs):
  File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 435, in __iter__
    do = self.iter(retry_state=retry_state)
  File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 368, in iter
    result = action(retry_state)
  File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 390, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/opt/airflow/airflow/utils/retries.py", line 98, in wrapped_function
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 1415, in _schedule_all_dag_runs
    callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
  File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 1415, in <listcomp>
    callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
  File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 1433, in _schedule_dag_run
    dag_model = DM.get_dagmodel(dag_run.dag_id, session)
  File "/opt/airflow/airflow/utils/session.py", line 81, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/models/dag.py", line 3737, in get_dagmodel
    return session.get(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2853, in get
    return self._get_impl(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2975, in _get_impl
    return db_load_fn(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity
    session.execute(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1720, in execute
    result = compile_state_cls.orm_setup_cursor_result(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/context.py", line 349, in orm_setup_cursor_result
    return loading.instances(result, querycontext)
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py", line 69, in instances
    *[
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py", line 70, in <listcomp>
    query_entity.row_processor(context, cursor)
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/context.py", line 2631, in row_processor
    _instance = loading._instance_processor(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py", line 796, in _instance_processor
    prop.create_row_processor(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/interfaces.py", line 658, in create_row_processor
    strat.create_row_processor(
  File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 263, in _exit_gracefully
    sys.exit(os.EX_OK)
SystemExit: 0

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 878, in _execute
    executor.end()
  File "/opt/airflow/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 743, in end
    self._flush_task_queue()
  File "/opt/airflow/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 699, in _flush_task_queue
    self.log.debug("Executor shutting down, task_queue approximate size=%d", self.task_queue.qsize())
  File "<string>", line 2, in qsize
  File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 834, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 411, in _send_bytes
    self._send(header + buf)
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
[2024-05-29T06:28:42.247+0000] {process_utils.py:132} INFO - Sending Signals.SIGTERM to group 97. PIDs of all processes in the group: []
[2024-05-29T06:28:42.247+0000] {process_utils.py:87} INFO - Sending the signal Signals.SIGTERM to group 97
[2024-05-29T06:28:42.247+0000] {process_utils.py:101} INFO - Sending the signal Signals.SIGTERM to process 97 as process group is missing.
[2024-05-29T06:28:42.247+0000] {scheduler_job_runner.py:886} INFO - Exited execute loop
[2024-05-29T06:28:42.258+0000] {cli_action_loggers.py:94} DEBUG - Calling callbacks: []
[2024-05-29T06:28:42.260+0000] {settings.py:425} DEBUG - Disposing DB connection pool (PID 11)

ephraimbuddy avatar May 29 '24 06:05 ephraimbuddy

Liveness:       exec [sh -c CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \                                    │
│ airflow jobs check --job-type SchedulerJob --local                                                                                                       │
│ ] delay=10s timeout=20s period=60s #success=1 #failure=5                                                                                                 │
│     Startup:  exec [sh -c CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \                                          │
│ airflow jobs check --job-type SchedulerJob --local                                                                                                       │
│ ] delay=0s timeout=20s period=10s #success=1 #failure=6    
Events:                                                                                                                                                  │
│   Type     Reason     Age                   From     Message                                                                                             │
│   ----     ------     ----                  ----     -------                                                                                             │
│   Warning  Unhealthy  73s (x90 over 7h19m)  kubelet  Liveness probe failed: command "sh -c CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL= │
│ ERROR exec /entrypoint \\\nairflow jobs check --job-type SchedulerJob --local\n" timed out                                                               │
│                                                                                                    

ephraimbuddy avatar May 29 '24 07:05 ephraimbuddy

@dviru , why would you set the pods not to be deleted? This leads to OOM because the pods occupy some space. Just trying to understand your needs and see if we should also have another config to check maximum number of completed pods that should be allowed to be in the deployment

ephraimbuddy avatar May 29 '24 09:05 ephraimbuddy

This also seems related: https://github.com/apache/airflow/issues/38968

sunank200 avatar May 31 '24 07:05 sunank200

@dviru , why would you set the pods not to be deleted? This leads to OOM because the pods occupy some space. Just trying to understand your needs and see if we should also have another config to check maximum number of completed pods that should be allowed to be in the deployment

@ephraimbuddy Some time I want pod should be exist to check the pod logs. But I am not seeing this issue in. 2.7.3 even though we have 6000-7000 completed pods in cluster.

dviru avatar Jun 07 '24 08:06 dviru

@dviru , why would you set the pods not to be deleted? This leads to OOM because the pods occupy some space. Just trying to understand your needs and see if we should also have another config to check maximum number of completed pods that should be allowed to be in the deployment

@ephraimbuddy Some time I want pod should be exist to check the pod logs. But I am not seeing this issue in. 2.7.3 even though we have 6000-7000 completed pods in cluster.

This problem can be solved by using remote logging. It's not right to keep 7000 completed pods in your cluster.

ephraimbuddy avatar Jun 07 '24 13:06 ephraimbuddy