airflow
airflow copied to clipboard
Kubernetes Executor Task Leak
Apache Airflow version
2.8.1
If "Other Airflow 2 version" selected, which one?
No response
What happened?
Scheduler stops processing DAGs and moving them to the queued status. When looking at the scheduler is debug mode following information is displayed.
[2024-01-24T13:40:15.828+0000] {scheduler_job_runner.py:1092} DEBUG - Executor full, skipping critical section
[2024-01-24T13:40:15.828+0000] {base_executor.py:217} DEBUG - 32 running task instances
[2024-01-24T13:40:15.829+0000] {base_executor.py:218} DEBUG - 0 in queue
[2024-01-24T13:40:15.829+0000] {base_executor.py:219} DEBUG - 0 open slots
We noticed that a fix was addressed here https://github.com/apache/airflow/pull/36240, however still seeing the same issues.
We are utilizing the airflow helm chart version 1.10, and we have the same issue happening in multiple environments. Two environments have parallelism set to 32 with 1 scheduler running. The other has 3 schedulers all with 32 parallelism.
What you think should happen instead?
When a task is complete it should release the slot.
How to reproduce
Currently it seems to just be a time thing, after a certain period of time running the slots fill up with completed tasks.
Operating System
Debian GNU/Linux 12
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==8.16.0 apache-airflow-providers-celery==3.5.1 apache-airflow-providers-cncf-kubernetes==7.13.0 apache-airflow-providers-common-io==1.2.0 apache-airflow-providers-common-sql==1.10.0 apache-airflow-providers-docker==3.9.1 apache-airflow-providers-elasticsearch==5.3.1 apache-airflow-providers-ftp==3.7.0 apache-airflow-providers-google==10.13.1 apache-airflow-providers-grpc==3.4.1 apache-airflow-providers-hashicorp==3.6.1 apache-airflow-providers-http==4.8.0 apache-airflow-providers-imap==3.5.0 apache-airflow-providers-microsoft-azure==8.5.1 apache-airflow-providers-mysql==5.5.1 apache-airflow-providers-odbc==4.4.0 apache-airflow-providers-openlineage==1.4.0 apache-airflow-providers-postgres==5.10.0 apache-airflow-providers-redis==3.6.0 apache-airflow-providers-sendgrid==3.4.0 apache-airflow-providers-sftp==4.8.1 apache-airflow-providers-slack==8.5.1 apache-airflow-providers-snowflake==5.2.1 apache-airflow-providers-sqlite==3.7.0 apache-airflow-providers-ssh==3.10.0
Deployment
Official Apache Airflow Helm Chart
Deployment details
Deploy via helm chart (1.10) to an azure aks. Deploy our own image with required packages/dags copied FROM apache/airflow:2.8.1-python3.11 Process is synced with ArgoCD deployment pipeline.
Anything else?
This problem for the most part occurs daily. We have a test instance with only 5 running dags that run once every hour and we are still seeing the issue.
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
Looking over the logs I get two different outcomes.... when I restart the pods I get the following:
[2024-01-25T13:20:23.100+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_parquet_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T09:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.101+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T09:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.101+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_csv_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T09:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.101+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_parquet_feed', task_id='check_new_files', run_id='scheduled__2024-01-25T00:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.101+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__azure_blob__data_platform_meta__test_trading_partner', task_id='check_new_files', run_id='scheduled__2024-01-25T00:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.101+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T03:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.101+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T00:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.102+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__azure_blob__data_platform_meta__test_trading_partner', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T03:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.102+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T04:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.102+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_parquet_feed', task_id='check_new_files', run_id='scheduled__2024-01-24T21:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.102+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T08:00:00+00:00', try_number=1, map_index=-1)
However after that I never get the success event:
[2024-01-25T14:00:24.228+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_zip_feed', task_id='check_new_files', run_id='scheduled__2024-01-25T13:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T14:00:24.228+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_csv_feed', task_id='check_new_files', run_id='scheduled__2024-01-25T13:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T14:00:24.228+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_parquet_feed', task_id='check_new_files', run_id='scheduled__2024-01-25T13:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T14:00:24.229+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='check_new_files', run_id='scheduled__2024-01-25T13:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T14:00:24.229+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__azure_blob__data_platform_meta__test_trading_partner', task_id='check_new_files', run_id='scheduled__2024-01-25T13:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T14:00:24.236+0000] {scheduler_job_runner.py:723} INFO - Setting external_id for <TaskInstance: acq__azure_blob__superhero__blob_battles_zip_feed.check_new_files scheduled__2024-01-25T13:00:00+00:00 [queued]> to 4915
[2024-01-25T14:00:24.236+0000] {scheduler_job_runner.py:723} INFO - Setting external_id for <TaskInstance: acq__azure_blob__superhero__blob_battles_csv_feed.check_new_files scheduled__2024-01-25T13:00:00+00:00 [queued]> to 4915
[2024-01-25T14:00:24.236+0000] {scheduler_job_runner.py:723} INFO - Setting external_id for <TaskInstance: acq__azure_blob__superhero__blob_battles_parquet_feed.check_new_files scheduled__2024-01-25T13:00:00+00:00 [queued]> to 4915
[2024-01-25T14:00:24.237+0000] {scheduler_job_runner.py:723} INFO - Setting external_id for <TaskInstance: acq__sftp__superhero__sftp_battles_csv_feed.check_new_files scheduled__2024-01-25T13:00:00+00:00 [queued]> to 4915
[2024-01-25T14:00:24.237+0000] {scheduler_job_runner.py:723} INFO - Setting external_id for <TaskInstance: acq__azure_blob__data_platform_meta__test_trading_partner.check_new_files scheduled__2024-01-25T13:00:00+00:00 [queued]> to 4915
[2024-01-25T14:00:24.243+0000] {manager.py:258} DEBUG - Received message of type DagParsingStat
[2024-01-25T14:00:24.243+0000] {manager.py:258} DEBUG - Received message of type DagParsingStat
[2024-01-25T14:00:24.243+0000] {manager.py:258} DEBUG - Received message of type DagParsingStat
[2024-01-25T14:00:24.243+0000] {manager.py:258} DEBUG - Received message of type DagParsingStat
[2024-01-25T14:00:24.244+0000] {manager.py:258} DEBUG - Received message of type DagParsingStat
Seems like whenever our executor is checking the state of the task instance its not being updated from the database....
Looking at the database entry, its clearly marked as success....
However its listed as running and stuck in the kubernetes_executor:
[2024-01-25T15:04:16.609+0000] {kubernetes_executor.py:387} DEBUG - self.running: {TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='check_new_files', run_id='scheduled__2024-01-25T14:00:00+00:00', try_number=1, map_index=-1)
are you seeing this issue when you run the airflow with single scheduler? Can you share the details to reprice it?
This requires triaging. Meanwhile, you can bump up the parallelism configuration to a higher number to beat the leak. Or Restart the scheduler after a certain number of iterations to rest these values.
are you seeing this issue when you run the airflow with single scheduler? Can you share the details to reprice it?
This requires triaging. Meanwhile, you can bump up the parallelism configuration to a higher number to beat the leak. Or Restart the scheduler after a certain number of iterations to rest these values.
Yes, this is when running on a single scheduler. We are utilizing the helm chart and only overriding the following values in values.yaml.
images:
airflow:
repository: <redacted>
tag: branch-70e99939
pullPolicy: Always
labels:
azure.workload.identity/use: 'true'
airflowPodAnnotations:
azure.workload.identity/client-id: <redacted>
env:
- name: AIRFLOW__CORE__TEST_CONNECTION
value: Enabled
- name: AIRFLOW__LOGGING__REMOTE_LOGGING
value: 'True'
- name: AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID
value: wasb_airlow_logs
- name: AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER
value: wasb-airflow-logging
- name: AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE
value: '4'
- name: ENVIRONMENT
value: dev
- name: LOG_LEVEL
value: DEBUG
- name: AIRFLOW__SMTP__SMTP_MAIL_TO
value: <redacted>
- name: AIRFLOW__SMTP__SMTP_HOST
value: <redacted>
- name: AIRFLOW__SMTP__SMTP_PORT
value: '25'
- name: AIRFLOW__SMTP__SMTP_STARTTLS
value: 'False'
- name: AIRFLOW__SMTP__SMTP_SSL
value: 'False'
- name: AIRFLOW__SMTP__SMTP_MAIL_FROM
value: <redacted>
- name: AIRFLOW__SCHEDULER__USE_ROW_LEVEL_LOCKING
value: 'False'
ingress:
web:
enabled: true
pathType: Prefix
hosts:
- name: dev-airflow.privatelink.eastus2.azmk8s.io
tls:
enabled: true
secretName: dev-airflow-tls
ingressClassName: nginx
annotations:
cert-manager.io/cluster-issuer: aks-ca-cluster-issuer
workers:
serviceAccount:
annotations:
azure.workload.identity/client-id: <redacted>
resources:
requests:
cpu: '0.5'
memory: 128Mi
limits:
cpu: '1'
memory: 512Mi
executor: KubernetesExecutor
allowPodLaunching: true
webserver:
replicas: 3
podDisruptionBudget:
enabled: true
config:
maxUnavailable: 1
serviceAccount:
annotations:
azure.workload.identity/client-id: <redacted>
webserverConfigConfigMapName: webserverconfig
volumes:
- name: secrets-store-inline
csi:
driver: secrets-store.csi.k8s.io
readOnly: true
volumeAttributes:
secretProviderClass: airflow-aks-secrets
volumeMounts:
- name: secrets-store-inline
mountPath: /mnt/secrets-store
readOnly: true
dags:
persistence:
enabled: false
createUserJob:
useHelmHooks: false
applyCustomEnv: false
extraEnvFrom: |-
- secretRef:
name: airflow-azure-oauth
migrateDatabaseJob:
useHelmHooks: false
applyCustomEnv: false
jobAnnotations:
argocd.argoproj.io/hook: Sync
postgresql:
enabled: false
metadataConnection:
sslmode: require
pgbouncer:
enabled: true
data:
metadataSecretName: postgresql-connection-url
webserverSecretKeySecretName: airflow-webserver-secret-key
fernetKeySecretName: airflow-fernet-key
triggerer:
replicas: 1
serviceAccount:
annotations:
azure.workload.identity/client-id: <redacted>
scheduler:
replicas: 1
serviceAccount:
annotations:
azure.workload.identity/client-id: <redacted>
args: ["bash", "-c", "exec airflow scheduler --verbose"]
config:
kubernetes_executor:
namespace: orch-dataplatform
webserver:
base_url: https://dev-airflow.privatelink.eastus2.azmk8s.io
We have been seeing this issue basically ever since we upgraded from 2.7.3 -> 2.8.0 (and now on 2.8.1)
Also we are creating our own airflow image off and importing our dags there: Dockerfile:
FROM apache/airflow:2.8.1-python3.11
USER root
RUN apt-get update && apt-get install -y --no-install-recommends grep procps
COPY --chown=airflow:root ./dags/ $AIRFLOW_HOME/dags/
USER airflow
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
requirements.txt
apache-airflow-providers-cncf-kubernetes==7.13.0
azure-identity==1.15.0
azure-storage-blob==12.19.0
azure-servicebus==7.11.4
opencensus-ext-azure==1.1.11
pydantic==2.4.2
pyhumps==3.8.0
I think we run into the same problem with a similar setup (using the official helm chart and upgraded vom airflow 2.7.3 to 2.8.1). After a while I stop seeing logs from the kubernetes_executor and tasks are stuck in queued after a deferred trigger event was fired. I.e. they are queued -> running -> deferred -> queued (and stuck here). Restarting the scheduler helps and the stuck tasks complete as expected.
I cannot identify the moment when the kubernetes_executor (assumingly) stops working yet.
So we actually are starting to see things work now potentially. We were utilizing an old version of the helm chart, and after upgrading from 1.10 to 1.11 we are seeing the executors just work.... So keeping this open till confirmed but that was the solution for at least us.
So after a brief sighting of things working again, we are now seeing it again.....
Single Scheduler running on the 1.11 Helm Chart, Airflow 2.8.1.
Looking into the latest occurrence, what is weird is we are seeing the following logged event:
However also see in the current running slots:
[2024-02-02T16:51:45.480+0000] {kubernetes_executor.py:387} DEBUG - self.running: {TaskInstanceKey(dag_id='acq__sftp__ohsu_partners_miah__prebill_daily_charges', task_id='get_execution_parameters', run_id='scheduled__2024-02-02T03:00:00+00:00', try_number=1, map_index=-1)}
This would semi indicate that the code here: https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job_runner.py#L705
Is not finding the event in our Database..... correct? We have a single scheduler and are setting row locking to false, which means the query should be completely un affected by any additional arguments.
Additional logging showing what this looks like as far as open slots:
@dirrao do I have to change labels in order to get follow up?
I am experiencing similar problem in 2.7.3
[2024-02-04T07:19:20.201+0000] {scheduler_job_runner.py:1081} DEBUG - Executor full, skipping critical section [2024-02-04T07:19:20.203+0000] {base_executor.py:217} DEBUG - 32 running task instances [2024-02-04T07:19:20.203+0000] {base_executor.py:218} DEBUG - 0 in queue [2024-02-04T07:19:20.203+0000] {base_executor.py:219} DEBUG - 0 open slots
Running two scheduler replica and getting a lot of msgs like scheduling was skipped, probably because the DAG record was locked
I manually killed one of the scheduler pod and it helped to evaluate the issue. My scheduler were not restarted from last 5 days.
airflow-scheduler-686459bbff-g68xj 2/2 Running 0 19m
airflow-scheduler-686459bbff-gv9vc 2/2 Running 0 5d11h
Airflow config
[core]
dags_folder = /opt/airflow/dags
hostname_callable = airflow.utils.net.getfqdn
might_contain_dag_callable = airflow.utils.file.might_contain_dag_via_default_heuristic
default_timezone = utc
executor = KubernetesExecutor
auth_manager = airflow.auth.managers.fab.fab_auth_manager.FabAuthManager
parallelism = 32
max_active_tasks_per_dag = 16
dags_are_paused_at_creation = True
max_active_runs_per_dag = 16
# mp_start_method =
load_examples = false
plugins_folder = /opt/airflow/plugins
execute_tasks_new_python_interpreter = False
fernet_key = xxxxxxx=
donot_pickle = True
dagbag_import_timeout = 30.0
dagbag_import_error_tracebacks = True
dagbag_import_error_traceback_depth = 2
dag_file_processor_timeout = 50
task_runner = StandardTaskRunner
default_impersonation =
security =
unit_test_mode = False
enable_xcom_pickling = False
allowed_deserialization_classes = airflow\..*
killed_task_cleanup_time = 60
dag_run_conf_overrides_params = True
dag_discovery_safe_mode = True
dag_ignore_file_syntax = regexp
default_task_retries = 0
default_task_retry_delay = 300
max_task_retry_delay = 86400
default_task_weight_rule = downstream
default_task_execution_timeout =
min_serialized_dag_update_interval = 30
compress_serialized_dags = False
min_serialized_dag_fetch_interval = 10
max_num_rendered_ti_fields_per_task = 30
check_slas = True
xcom_backend = airflow.models.xcom.BaseXCom
lazy_load_plugins = True
lazy_discover_providers = True
hide_sensitive_var_conn_fields = True
sensitive_var_conn_names =
default_pool_task_slot_count = 512
max_map_length = 1024
daemon_umask = 0o077
# dataset_manager_class =
# dataset_manager_kwargs =
database_access_isolation = False
# internal_api_url =
test_connection = Disabled
colored_console_log = True
remote_logging = True
[database]
alembic_ini_file_path = alembic.ini
sql_alchemy_conn = postgresql+psycopg2://.....
# sql_alchemy_engine_args =
sql_engine_encoding = utf-8
# sql_engine_collation_for_ids =
sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 5
sql_alchemy_max_overflow = 10
sql_alchemy_pool_recycle = 1800
sql_alchemy_pool_pre_ping = True
sql_alchemy_schema =
# sql_alchemy_connect_args =
load_default_connections = True
max_db_retries = 3
check_migrations = True
[logging]
base_log_folder = /opt/airflow/logs
remote_logging = True
remote_log_conn_id = aws_conn_s3
delete_local_logs = False
google_key_path =
remote_base_log_folder = s3://s3-paas/airflow-prod-logs/
remote_task_handler_kwargs =
encrypt_s3_logs = False
logging_level = DEBUG
celery_logging_level =
fab_logging_level = WARNING
logging_config_class =
colored_console_log = True
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
dag_processor_log_target = file
dag_processor_log_format = [%%(asctime)s] [SOURCE:DAG_PROCESSOR] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
log_formatter_class = airflow.utils.log.timezone_aware.TimezoneAware
secret_mask_adapter =
task_log_prefix_template =
log_filename_template = dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{%% if ti.map_index >= 0 %%}map_index={{ ti.map_index }}/{%% endif %%}attempt={{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
task_log_reader = task
extra_logger_names =
worker_log_server_port = 8793
trigger_log_server_port = 8794
# interleave_timestamp_parser =
file_task_handler_new_folder_permissions = 0o775
file_task_handler_new_file_permissions = 0o664
celery_stdout_stderr_separation = False
delete_worker_pods = False
[metrics]
metrics_allow_list =
metrics_block_list =
statsd_on = True
statsd_host = airflow-statsd
statsd_port = 9125
statsd_prefix = airflow
stat_name_handler =
statsd_datadog_enabled = False
statsd_datadog_tags =
statsd_datadog_metrics_tags = True
# statsd_custom_client_path =
statsd_disabled_tags = job_id,run_id
statsd_influxdb_enabled = False
otel_on = False
otel_host = localhost
otel_port = 8889
otel_prefix = airflow
otel_interval_milliseconds = 60000
otel_debugging_on = False
otel_ssl_active = False
[secrets]
backend =
backend_kwargs =
use_cache = False
cache_ttl_seconds = 900
[cli]
api_client = airflow.api.client.local_client
endpoint_url = http://localhost:8080
[debug]
fail_fast = False
[api]
enable_experimental_api = False
auth_backends = airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session
maximum_page_limit = 100
fallback_page_limit = 100
google_oauth2_audience =
google_key_path =
access_control_allow_headers =
access_control_allow_methods =
access_control_allow_origins =
enable_xcom_deserialize_support = False
[lineage]
backend =
[operators]
default_owner = airflow
default_deferrable = false
default_cpus = 1
default_ram = 512
default_disk = 512
default_gpus = 0
default_queue = default
allow_illegal_arguments = False
[webserver]
access_denied_message = Access is Denied
config_file = /opt/airflow/webserver_config.py
base_url = http://localhost:8080
default_ui_timezone = UTC
web_server_host = 0.0.0.0
web_server_port = 8080
web_server_ssl_cert =
web_server_ssl_key =
session_backend = database
web_server_master_timeout = 120
web_server_worker_timeout = 120
worker_refresh_batch_size = 1
worker_refresh_interval = 6000
reload_on_plugin_change = False
secret_key =xxxxxxxx
workers = 4
worker_class = sync
access_logfile = -
error_logfile = -
access_logformat =
expose_config = False
expose_hostname = False
expose_stacktrace = False
dag_default_view = grid
dag_orientation = LR
grid_view_sorting_order = topological
log_fetch_timeout_sec = 5
log_fetch_delay_sec = 2
log_auto_tailing_offset = 30
log_animation_speed = 1000
hide_paused_dags_by_default = False
page_size = 100
navbar_color = #fff
default_dag_run_display_number = 25
enable_proxy_fix = True
proxy_fix_x_for = 1
proxy_fix_x_proto = 1
proxy_fix_x_host = 1
proxy_fix_x_port = 1
proxy_fix_x_prefix = 1
cookie_secure = False
cookie_samesite = Lax
default_wrap = False
x_frame_enabled = True
# analytics_tool =
# analytics_id =
show_recent_stats_for_completed_runs = True
update_fab_perms = True
session_lifetime_minutes = 43200
# instance_name =
instance_name_has_markup = False
auto_refresh_interval = 3
warn_deployment_exposure = True
audit_view_excluded_events = gantt,landing_times,tries,duration,calendar,graph,grid,tree,tree_data
# audit_view_included_events =
enable_swagger_ui = True
run_internal_api = False
auth_rate_limited = True
auth_rate_limit = 5 per 40 second
caching_hash_method = md5
show_trigger_form_if_no_params = False
rbac = True
[email]
email_backend = airflow.utils.email.send_email_smtp
email_conn_id = smtp_default
default_email_on_retry = True
default_email_on_failure = True
# subject_template =
# html_content_template =
# from_email =
ssl_context = default
[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = true
smtp_ssl = true
smtp_user = [email protected]
smtp_password = [email protected]
smtp_port = 587
smtp_mail_from = [email protected]
smtp_timeout = 30
smtp_retry_limit = 5
[sentry]
sentry_on = false
sentry_dsn =
# before_send =
[scheduler]
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5
local_task_job_heartbeat_sec = 0
num_runs = -1
scheduler_idle_sleep_time = 1
min_file_process_interval = 30
parsing_cleanup_interval = 60
stale_dag_threshold = 50
dag_dir_list_interval = 30
print_stats_interval = 30
pool_metrics_interval = 5.0
scheduler_health_check_threshold = 30
enable_health_check = False
scheduler_health_check_server_port = 8974
orphaned_tasks_check_interval = 300.0
child_process_log_directory = /opt/airflow/logs/scheduler
scheduler_zombie_task_threshold = 300
zombie_detection_interval = 10.0
catchup_by_default = True
ignore_first_depends_on_past_by_default = True
max_tis_per_query = 16
use_row_level_locking = True
max_dagruns_to_create_per_loop = 10
max_dagruns_per_loop_to_schedule = 20
schedule_after_task_execution = True
parsing_pre_import_modules = True
parsing_processes = 2
file_parsing_sort_mode = modified_time
standalone_dag_processor = False
max_callbacks_per_loop = 20
dag_stale_not_seen_duration = 600
use_job_schedule = True
allow_trigger_in_future = False
trigger_timeout_check_interval = 15
task_queued_timeout = 600.0
task_queued_timeout_check_interval = 120.0
allowed_run_id_pattern = ^[A-Za-z0-9_.~:+-]+$
logging_level = DEBUG
run_duration = 41460
statsd_host = airflow-statsd
statsd_on = True
statsd_port = 9125
statsd_prefix = airflow
[triggerer]
default_capacity = 1000
job_heartbeat_sec = 5
triggerer_health_check_threshold = 30
[sensors]
default_timeout = 604800
[aws]
# session_factory =
cloudwatch_task_handler_json_serializer = airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize_legacy
[aws_ecs_executor]
conn_id = aws_default
# region_name =
assign_public_ip = False
# cluster =
# container_name =
launch_type = FARGATE
platform_version = LATEST
# security_groups =
# subnets =
# task_definition =
max_run_task_attempts = 3
# run_task_kwargs =
[celery_kubernetes_executor]
kubernetes_queue = kubernetes
[celery]
celery_app_name = airflow.providers.celery.executors.celery_executor
worker_concurrency = 16
# worker_autoscale =
worker_prefetch_multiplier = 1
worker_enable_remote_control = true
broker_url = redis://redis:6379/0
# result_backend =
result_backend_sqlalchemy_engine_options =
flower_host = 0.0.0.0
flower_url_prefix = /
flower_port = 5555
flower_basic_auth =
sync_parallelism = 0
celery_config_options = airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG
ssl_active = False
ssl_key =
ssl_cert =
ssl_cacert =
pool = prefork
operation_timeout = 1.0
task_track_started = True
task_publish_max_retries = 3
worker_precheck = False
[celery_broker_transport_options]
# visibility_timeout =
# sentinel_kwargs =
[local_kubernetes_executor]
kubernetes_queue = kubernetes
[kubernetes_executor]
api_client_retry_configuration =
logs_task_metadata = False
pod_template_file = /opt/airflow/pod_templates/pod_template_file.yaml
worker_container_repository = xyz.com/airflow-prod
worker_container_tag = a6a136ee
namespace = airflow
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1
multi_namespace_mode = False
multi_namespace_mode_namespace_list =
in_cluster = True
# cluster_context =
# config_file =
kube_client_request_args =
delete_option_kwargs =
enable_tcp_keepalive = True
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6
verify_ssl = True
worker_pods_queued_check_interval = 60
ssl_ca_cert =
[dask]
cluster_address = 127.0.0.1:8786
tls_ca =
tls_cert =
tls_key =
[azure_remote_logging]
remote_wasb_log_container = airflow-logs
[kubernetes]
airflow_configmap = airflow-airflow-config
airflow_local_settings_configmap = airflow-airflow-config
multi_namespace_mode = False
namespace = airflow
pod_template_file = /opt/airflow/pod_templates/pod_template_file.yaml
worker_container_repository = xyz.com/airflow-prod
worker_container_tag = a6a136ee
Able to capture a pretty good log of what we are seeing. Things were working fine yesterday between 09:00 - 17:00, then 18:00 comes along and we start seeing the execution lock issue. No deployments happened at this time, same Dags were running and no error logs as far as we can tell just yet.
After a couple of weeks with no responses on this post we just decided to revert back to 2.7.2 and the issue is gone. Down the road we will investigate being able to use 2.8.x on something like astronomer, but can definitely confirm that the change to 2.7 -> 2.8 caused this issue.
It looks like the scheduler or the kubernetes_executor cannot recover from communication issues with kubernetes. I've collected a few hours of logging after a restart of the scheduler and the problems seem to occur after following lines:
[2024-02-22T10:24:59.431+0000] {kubernetes_executor_utils.py:121} ERROR - Unknown error in KubernetesJobWatcher. Failing
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 710, in _error_catcher
yield
File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 1073, in read_chunked
self._update_chunk_length()
File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 1008, in _update_chunk_length
raise InvalidChunkLength(self, line) from None
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 112, in run
self.resource_version = self._run(
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 168, in _run
for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs):
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/watch/watch.py", line 165, in stream
for line in iter_resp_lines(resp):
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
for seg in resp.stream(amt=None, decode_content=False):
File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 933, in stream
yield from self.read_chunked(amt, decode_content=decode_content)
File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 1061, in read_chunked
with self._error_catcher():
File "/usr/local/lib/python3.10/contextlib.py", line 153, in __exit__
self.gen.throw(typ, value, traceback)
File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 727, in _error_catcher
raise ProtocolError(f"Connection broken: {e!r}", e) from e
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
Process KubernetesJobWatcher-3:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 710, in _error_catcher
yield
File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 1073, in read_chunked
self._update_chunk_length()
File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 1008, in _update_chunk_length
raise InvalidChunkLength(self, line) from None
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
self.run()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 112, in run
self.resource_version = self._run(
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 168, in _run
for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs):
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/watch/watch.py", line 165, in stream
for line in iter_resp_lines(resp):
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
for seg in resp.stream(amt=None, decode_content=False):
File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 933, in stream
yield from self.read_chunked(amt, decode_content=decode_content)
File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 1061, in read_chunked
with self._error_catcher():
File "/usr/local/lib/python3.10/contextlib.py", line 153, in __exit__
self.gen.throw(typ, value, traceback)
File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 727, in _error_catcher
raise ProtocolError(f"Connection broken: {e!r}", e) from e
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
[2024-02-22T10:24:59.568+0000] {kubernetes_executor_utils.py:359} ERROR - Error while health checking kube watcher process for namespace airflow. Process died for unknown reasons
[2024-02-22T10:24:59.586+0000] {kubernetes_executor_utils.py:157} INFO - Event: and now my watch begins starting at resource_version: 0
After that, tasks are stuck in queued and I don't see any more lines of the kind
[2024-02-22T10:16:50.260+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey
I can only recover from that state by clearing all scheduled and queued tasks and restarting the scheduler. I wasn't able to dig deeper into the kubernetes_executor yet, but there seem to be quite a few changes between 2.7.3 and 2.8.1. That would be my first guess for the origin of this.
Hi! I had the same error with 2.7.3 described in https://github.com/apache/airflow/issues/36478 and I tested it on 2.8.4 and bug still exists
Update: I have the same error with 1 scheduler with Airflow 2.8.4. But I think that error may be also in the kubernetes provider. Libs:
apache-airflow == 2.8.4
dbt-core == 1.7.11
dbt-snowflake == 1.7.3
apache-airflow[statsd]
facebook-business == 19.0.2
google-ads == 22.1.0
twitter-ads == 11.0.0
acryl-datahub-airflow-plugin == 0.10.2.3
acryl-datahub[dbt] == 0.10.2.3
checksumdir
filelock
openpyxl
cronsim
apache-airflow-providers-snowflake
apache-airflow-providers-cncf-kubernetes == 8.0.1
apache-airflow-providers-apache-kafka == 1.3.1
apache-airflow-providers-slack == 8.6.1
apache-airflow-providers-amazon
kubernetes
snowplow_analytics_sdk
elementary-data == 0.14.1
@smhood I have downgraded Airflow version to 2.7.2, but issue still exists...
apache-airflow == 2.7.2
dbt-core == 1.7.11
dbt-snowflake == 1.7.3
apache-airflow[statsd]
facebook-business == 19.0.2
google-ads == 22.1.0
twitter-ads == 11.0.0
acryl-datahub-airflow-plugin == 0.10.2.3
acryl-datahub[dbt] == 0.10.2.3
checksumdir
filelock
openpyxl
cronsim
apache-airflow-providers-snowflake
apache-airflow-providers-cncf-kubernetes == 8.0.1
apache-airflow-providers-apache-kafka == 1.3.1
apache-airflow-providers-slack == 8.6.1
apache-airflow-providers-amazon
kubernetes
snowplow_analytics_sdk
elementary-data == 0.14.1
Maybe issue is inside providers packages?
Found emails related the same issue: https://www.mail-archive.com/[email protected]/msg309101.html
I found workaround and some insights: If scheduler parallelism is less than sum of all slots in pools (32 parallelism and for example 8 pools by 8 slots) and all slots are used - scheduler starts leaking slots after some time. Workaround - set parallelism higher than pools slots sum you have
@crabio , Were you able to find a solution ? We are also facing the task leak issue in v2.6.3
@paramjeet01 Fully - not
We still have slots leak:
But we found walkaround:
- set
AIRFLOW__CORE__PARALLELISM
to 512+ to have free slots in the scheduler. Or bigger than sum of slots in all pools that you have - try set
AIRFLOW__SCHEDULER__NUM_RUNS
to reset scheduler some times. We are going to try this option with 4 hour scheduler life time - check that you haven't zombie pods in all namespaces that you are using for the Airflow. We have some namespaces, where, we think, Airflow Scheduler has no access to kill pods and we have zombie succeeded tasks
@crabio I have updated my comments here https://github.com/apache/airflow/issues/38968#issuecomment-2059521327 , I was able to improve the performance and the task no longer have longer queue duration
I see similarity between the issue we are facing and the one you describe.
Airflow 2.8.4
We run one instance of scheduler and we observe a list of completed tasks (usually cleared) like here: https://github.com/apache/airflow/issues/33402),
Config:
- core parallelism = 32
- default pool slots = 128
At 3 AM there were supposed to run more tasks but nothing like that happened.
In the morning we ended up having no running tasks, many in queued state
@crabio Could you please post steps to reproduce the issue? Then I could spend a little bit more time understanding it
@aru-trackunit Sure
- run Airflow with Kubernetes Executor with 1 scheduler
- run some tasks in your default namespace (maybe it is not required) and run tasks in another namespace with access for Airflow for create, watch and delete pods
- run airflow with active tasks for some time. Need to have running tasks for some time to catch slots leak.
Maybe tasks in another namespace not required, because we faced this issue before we started using multiple namespaces. I think @paramjeet01 and @smhood has 1 namespace
@crabio , Yes we run in single namespace.
I am seeing issue in single namespace. Scheduler fails to remove pods in Completed state and after running slot reaches 32 new scheduled task aren't getting queued. airflow==2.8.4 apache-airflow-providers-cncf-kubernetes==7.14.0 kubernetes==23.6.0 urllib3==2.0.7
KubernetesJobWatcher
failed a couple times but it was able to restart.
In the logs below, the Watcher running on PID: 2034 failed. On the next sync of the executor, it was able to start back with PID: 3740.
[2024-04-18T23:29:34.285+0000] [2034:139691425343296] {airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:121} ERROR - Unknown error in KubernetesJobWatcher. Failing
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 710, in _error_catcher
yield
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 1073, in read_chunked
self._update_chunk_length()
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 1008, in _update_chunk_length
raise InvalidChunkLength(self, line) from None
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
self.run()
File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 112, in run
self.resource_version = self._run(
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 168, in _run
for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs):
File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 165, in stream
for line in iter_resp_lines(resp):
File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
for seg in resp.stream(amt=None, decode_content=False):
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 933, in stream
yield from self.read_chunked(amt, decode_content=decode_content)
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 1061, in read_chunked
with self._error_catcher():
File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 727, in _error_catcher
raise ProtocolError(f"Connection broken: {e!r}", e) from e
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
[2024-04-18T23:29:35.067+0000] [ 7:139691425343296] {airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:449} DEBUG - Syncing KubernetesExecutor
[2024-04-18T23:29:35.067+0000] [ 7:139691425343296] {airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:359} ERROR - Error while health checking kube watcher process for namespace astronomer-contractual-po-prod. Process died for unknown reasons
[2024-04-18T23:29:35.078+0000] [3740:139691425343296] {airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:157} INFO - Event: and now my watch begins starting at resource_version: 0
I was working fine for a minute; it was reporting back the pod changes.
[2024-04-18T23:30:50.605+0000] [3740:139691425343296] {airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:170} DEBUG - Event: strategizer-monitor-check-strategizer-bid-statuses-lz2ww1cv had an event of type DELETED
[2024-04-18T23:30:50.605+0000] [3740:139691425343296] {airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:256} INFO - Skipping event for Succeeded pod strategizer-monitor-check-strategizer-bid-statuses-lz2ww1cv - event for this pod already sent to executor
After this the Watcher went silent, no logs with PID 3740. KubernetesExecuter.running set kept increasing:
[2024-04-18T23:40:01.059+0000] [ 7:139691425343296] {airflow/executors/base_executor.py:217} DEBUG - 1 running task instances
[2024-04-18T23:40:01.060+0000] [ 7:139691425343296] {airflow/executors/base_executor.py:218} DEBUG - 0 in queue
[2024-04-18T23:40:01.060+0000] [ 7:139691425343296] {airflow/executors/base_executor.py:219} DEBUG - 31 open slots
...
[2024-04-19T13:24:44.721+0000] [ 7:139691425343296] {airflow/executors/base_executor.py:217} DEBUG - 32 running task instances
[2024-04-19T13:24:44.721+0000] [ 7:139691425343296] {airflow/executors/base_executor.py:218} DEBUG - 0 in queue
[2024-04-19T13:24:44.721+0000] [ 7:139691425343296] {airflow/executors/base_executor.py:219} DEBUG - 0 open slots
I confirmed that the PID: 3740 is still running.
/usr/local/airflow$ ps -ef
UID PID PPID C STIME TIME CMD
astro 1 0 0 Apr18 00:00:00 tini -- /entrypoint bash -c exec airflow scheduler
astro 7 1 4 Apr18 00:37:17 /usr/local/bin/python /usr/local/bin/airflow scheduler
astro 24 7 0 Apr18 00:01:06 /usr/local/bin/python /usr/local/bin/airflow scheduler
astro 33 7 0 Apr18 00:00:41 /usr/local/bin/python /usr/local/bin/airflow scheduler
astro 44 7 1 Apr18 00:12:28 airflow scheduler -- DagFileProcessorManager
astro 3740 7 0 Apr18 00:00:00 /usr/local/bin/python /usr/local/bin/airflow scheduler <=== KubernetesJobWatcher
KubernetesJobWatcher
is stuck in generator self._pod_events(kube_client=kube_client, query_kwargs=kwargs)
https://github.com/apache/airflow/blob/cead3da4a6f483fa626b81efd27a24dcb5a36ab0/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py#L168
waiting for stream from kubernetes Watch.stream() and urllib3 HttpResponse.stream()
https://github.com/kubernetes-client/python/blob/98cd2251152fcdbfa6de24c85384887b0999a94c/kubernetes/base/watch/watch.py#L56
https://github.com/urllib3/urllib3/blob/56f01e088dc006c03d4ee6ea9da4ab810f1ed700/src/urllib3/response.py#L914