airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Kubernetes Executor Task Leak

Open smhood opened this issue 1 year ago • 37 comments

Apache Airflow version

2.8.1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Scheduler stops processing DAGs and moving them to the queued status. When looking at the scheduler is debug mode following information is displayed.

[2024-01-24T13:40:15.828+0000] {scheduler_job_runner.py:1092} DEBUG - Executor full, skipping critical section
[2024-01-24T13:40:15.828+0000] {base_executor.py:217} DEBUG - 32 running task instances
[2024-01-24T13:40:15.829+0000] {base_executor.py:218} DEBUG - 0 in queue
[2024-01-24T13:40:15.829+0000] {base_executor.py:219} DEBUG - 0 open slots

We noticed that a fix was addressed here https://github.com/apache/airflow/pull/36240, however still seeing the same issues.

We are utilizing the airflow helm chart version 1.10, and we have the same issue happening in multiple environments. Two environments have parallelism set to 32 with 1 scheduler running. The other has 3 schedulers all with 32 parallelism.

What you think should happen instead?

When a task is complete it should release the slot.

How to reproduce

Currently it seems to just be a time thing, after a certain period of time running the slots fill up with completed tasks.

Operating System

Debian GNU/Linux 12

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.16.0 apache-airflow-providers-celery==3.5.1 apache-airflow-providers-cncf-kubernetes==7.13.0 apache-airflow-providers-common-io==1.2.0 apache-airflow-providers-common-sql==1.10.0 apache-airflow-providers-docker==3.9.1 apache-airflow-providers-elasticsearch==5.3.1 apache-airflow-providers-ftp==3.7.0 apache-airflow-providers-google==10.13.1 apache-airflow-providers-grpc==3.4.1 apache-airflow-providers-hashicorp==3.6.1 apache-airflow-providers-http==4.8.0 apache-airflow-providers-imap==3.5.0 apache-airflow-providers-microsoft-azure==8.5.1 apache-airflow-providers-mysql==5.5.1 apache-airflow-providers-odbc==4.4.0 apache-airflow-providers-openlineage==1.4.0 apache-airflow-providers-postgres==5.10.0 apache-airflow-providers-redis==3.6.0 apache-airflow-providers-sendgrid==3.4.0 apache-airflow-providers-sftp==4.8.1 apache-airflow-providers-slack==8.5.1 apache-airflow-providers-snowflake==5.2.1 apache-airflow-providers-sqlite==3.7.0 apache-airflow-providers-ssh==3.10.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

Deploy via helm chart (1.10) to an azure aks. Deploy our own image with required packages/dags copied FROM apache/airflow:2.8.1-python3.11 Process is synced with ArgoCD deployment pipeline.

Anything else?

This problem for the most part occurs daily. We have a test instance with only 5 running dags that run once every hour and we are still seeing the issue.

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

smhood avatar Jan 24 '24 14:01 smhood

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

boring-cyborg[bot] avatar Jan 24 '24 14:01 boring-cyborg[bot]

Looking over the logs I get two different outcomes.... when I restart the pods I get the following:

[2024-01-25T13:20:23.100+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_parquet_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T09:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.101+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T09:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.101+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_csv_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T09:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.101+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_parquet_feed', task_id='check_new_files', run_id='scheduled__2024-01-25T00:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.101+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__azure_blob__data_platform_meta__test_trading_partner', task_id='check_new_files', run_id='scheduled__2024-01-25T00:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.101+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T03:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.101+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T00:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.102+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__azure_blob__data_platform_meta__test_trading_partner', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T03:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.102+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T04:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.102+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_parquet_feed', task_id='check_new_files', run_id='scheduled__2024-01-24T21:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.102+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T08:00:00+00:00', try_number=1, map_index=-1)

However after that I never get the success event:

[2024-01-25T14:00:24.228+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_zip_feed', task_id='check_new_files', run_id='scheduled__2024-01-25T13:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T14:00:24.228+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_csv_feed', task_id='check_new_files', run_id='scheduled__2024-01-25T13:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T14:00:24.228+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_parquet_feed', task_id='check_new_files', run_id='scheduled__2024-01-25T13:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T14:00:24.229+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='check_new_files', run_id='scheduled__2024-01-25T13:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T14:00:24.229+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__azure_blob__data_platform_meta__test_trading_partner', task_id='check_new_files', run_id='scheduled__2024-01-25T13:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T14:00:24.236+0000] {scheduler_job_runner.py:723} INFO - Setting external_id for <TaskInstance: acq__azure_blob__superhero__blob_battles_zip_feed.check_new_files scheduled__2024-01-25T13:00:00+00:00 [queued]> to 4915
[2024-01-25T14:00:24.236+0000] {scheduler_job_runner.py:723} INFO - Setting external_id for <TaskInstance: acq__azure_blob__superhero__blob_battles_csv_feed.check_new_files scheduled__2024-01-25T13:00:00+00:00 [queued]> to 4915
[2024-01-25T14:00:24.236+0000] {scheduler_job_runner.py:723} INFO - Setting external_id for <TaskInstance: acq__azure_blob__superhero__blob_battles_parquet_feed.check_new_files scheduled__2024-01-25T13:00:00+00:00 [queued]> to 4915
[2024-01-25T14:00:24.237+0000] {scheduler_job_runner.py:723} INFO - Setting external_id for <TaskInstance: acq__sftp__superhero__sftp_battles_csv_feed.check_new_files scheduled__2024-01-25T13:00:00+00:00 [queued]> to 4915
[2024-01-25T14:00:24.237+0000] {scheduler_job_runner.py:723} INFO - Setting external_id for <TaskInstance: acq__azure_blob__data_platform_meta__test_trading_partner.check_new_files scheduled__2024-01-25T13:00:00+00:00 [queued]> to 4915
[2024-01-25T14:00:24.243+0000] {manager.py:258} DEBUG - Received message of type DagParsingStat
[2024-01-25T14:00:24.243+0000] {manager.py:258} DEBUG - Received message of type DagParsingStat
[2024-01-25T14:00:24.243+0000] {manager.py:258} DEBUG - Received message of type DagParsingStat
[2024-01-25T14:00:24.243+0000] {manager.py:258} DEBUG - Received message of type DagParsingStat
[2024-01-25T14:00:24.244+0000] {manager.py:258} DEBUG - Received message of type DagParsingStat

smhood avatar Jan 25 '24 15:01 smhood

Seems like whenever our executor is checking the state of the task instance its not being updated from the database....

Looking at the database entry, its clearly marked as success.... image

However its listed as running and stuck in the kubernetes_executor:

[2024-01-25T15:04:16.609+0000] {kubernetes_executor.py:387} DEBUG - self.running: {TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='check_new_files', run_id='scheduled__2024-01-25T14:00:00+00:00', try_number=1, map_index=-1)

smhood avatar Jan 25 '24 16:01 smhood

are you seeing this issue when you run the airflow with single scheduler? Can you share the details to reprice it?

This requires triaging. Meanwhile, you can bump up the parallelism configuration to a higher number to beat the leak. Or Restart the scheduler after a certain number of iterations to rest these values.

dirrao avatar Jan 25 '24 16:01 dirrao

are you seeing this issue when you run the airflow with single scheduler? Can you share the details to reprice it?

This requires triaging. Meanwhile, you can bump up the parallelism configuration to a higher number to beat the leak. Or Restart the scheduler after a certain number of iterations to rest these values.

Yes, this is when running on a single scheduler. We are utilizing the helm chart and only overriding the following values in values.yaml.

images:
  airflow:
    repository:  <redacted>
    tag: branch-70e99939
    pullPolicy: Always
labels:
  azure.workload.identity/use: 'true'
airflowPodAnnotations:
  azure.workload.identity/client-id:  <redacted>
env:
  - name: AIRFLOW__CORE__TEST_CONNECTION
    value: Enabled
  - name: AIRFLOW__LOGGING__REMOTE_LOGGING
    value: 'True'
  - name: AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID
    value: wasb_airlow_logs
  - name: AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER
    value: wasb-airflow-logging
  - name: AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE
    value: '4'
  - name: ENVIRONMENT
    value: dev
  - name: LOG_LEVEL
    value: DEBUG
  - name: AIRFLOW__SMTP__SMTP_MAIL_TO
    value:  <redacted>
  - name: AIRFLOW__SMTP__SMTP_HOST
    value:  <redacted>
  - name: AIRFLOW__SMTP__SMTP_PORT
    value: '25'
  - name: AIRFLOW__SMTP__SMTP_STARTTLS
    value: 'False'
  - name: AIRFLOW__SMTP__SMTP_SSL
    value: 'False'
  - name: AIRFLOW__SMTP__SMTP_MAIL_FROM
    value:  <redacted>
  - name: AIRFLOW__SCHEDULER__USE_ROW_LEVEL_LOCKING
    value: 'False'
ingress:
  web:
    enabled: true
    pathType: Prefix
    hosts:
      - name: dev-airflow.privatelink.eastus2.azmk8s.io
        tls:
          enabled: true
          secretName: dev-airflow-tls
    ingressClassName: nginx
    annotations:
      cert-manager.io/cluster-issuer: aks-ca-cluster-issuer
workers:
  serviceAccount:
    annotations:
      azure.workload.identity/client-id:  <redacted>
  resources:
    requests:
      cpu: '0.5'
      memory: 128Mi
    limits:
      cpu: '1'
      memory: 512Mi
executor: KubernetesExecutor
allowPodLaunching: true
webserver:
  replicas: 3
  podDisruptionBudget:
    enabled: true
    config:
      maxUnavailable: 1
  serviceAccount:
    annotations:
      azure.workload.identity/client-id:  <redacted>
  webserverConfigConfigMapName: webserverconfig
volumes:
  - name: secrets-store-inline
    csi:
      driver: secrets-store.csi.k8s.io
      readOnly: true
      volumeAttributes:
        secretProviderClass: airflow-aks-secrets
volumeMounts:
  - name: secrets-store-inline
    mountPath: /mnt/secrets-store
    readOnly: true
dags:
  persistence:
    enabled: false
createUserJob:
  useHelmHooks: false
  applyCustomEnv: false
extraEnvFrom: |-
  - secretRef:
      name: airflow-azure-oauth
migrateDatabaseJob:
  useHelmHooks: false
  applyCustomEnv: false
  jobAnnotations:
    argocd.argoproj.io/hook: Sync
postgresql:
  enabled: false
metadataConnection:
  sslmode: require
pgbouncer:
  enabled: true
data:
  metadataSecretName: postgresql-connection-url
webserverSecretKeySecretName: airflow-webserver-secret-key
fernetKeySecretName: airflow-fernet-key
triggerer:
  replicas: 1
  serviceAccount:
    annotations:
      azure.workload.identity/client-id: <redacted>
scheduler:
  replicas: 1
  serviceAccount:
    annotations:
      azure.workload.identity/client-id: <redacted>
  args: ["bash", "-c", "exec airflow scheduler --verbose"]
config:
  kubernetes_executor:
    namespace: orch-dataplatform
  webserver:
    base_url: https://dev-airflow.privatelink.eastus2.azmk8s.io

smhood avatar Jan 25 '24 16:01 smhood

We have been seeing this issue basically ever since we upgraded from 2.7.3 -> 2.8.0 (and now on 2.8.1)

smhood avatar Jan 25 '24 16:01 smhood

Also we are creating our own airflow image off and importing our dags there: Dockerfile:

FROM apache/airflow:2.8.1-python3.11
USER root
RUN apt-get update && apt-get install -y --no-install-recommends grep procps
COPY --chown=airflow:root ./dags/ $AIRFLOW_HOME/dags/
USER airflow
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

requirements.txt

apache-airflow-providers-cncf-kubernetes==7.13.0
azure-identity==1.15.0
azure-storage-blob==12.19.0
azure-servicebus==7.11.4
opencensus-ext-azure==1.1.11
pydantic==2.4.2
pyhumps==3.8.0

smhood avatar Jan 25 '24 17:01 smhood

I think we run into the same problem with a similar setup (using the official helm chart and upgraded vom airflow 2.7.3 to 2.8.1). After a while I stop seeing logs from the kubernetes_executor and tasks are stuck in queued after a deferred trigger event was fired. I.e. they are queued -> running -> deferred -> queued (and stuck here). Restarting the scheduler helps and the stuck tasks complete as expected.

I cannot identify the moment when the kubernetes_executor (assumingly) stops working yet.

bixel avatar Jan 31 '24 06:01 bixel

So we actually are starting to see things work now potentially. We were utilizing an old version of the helm chart, and after upgrading from 1.10 to 1.11 we are seeing the executors just work.... So keeping this open till confirmed but that was the solution for at least us.

smhood avatar Jan 31 '24 14:01 smhood

So after a brief sighting of things working again, we are now seeing it again.....

Single Scheduler running on the 1.11 Helm Chart, Airflow 2.8.1.

smhood avatar Feb 02 '24 13:02 smhood

Looking into the latest occurrence, what is weird is we are seeing the following logged event: image

However also see in the current running slots:

[2024-02-02T16:51:45.480+0000] {kubernetes_executor.py:387} DEBUG - self.running: {TaskInstanceKey(dag_id='acq__sftp__ohsu_partners_miah__prebill_daily_charges', task_id='get_execution_parameters', run_id='scheduled__2024-02-02T03:00:00+00:00', try_number=1, map_index=-1)}

This would semi indicate that the code here: https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job_runner.py#L705

Is not finding the event in our Database..... correct? We have a single scheduler and are setting row locking to false, which means the query should be completely un affected by any additional arguments.

image

smhood avatar Feb 02 '24 18:02 smhood

Additional logging showing what this looks like as far as open slots: image

smhood avatar Feb 02 '24 18:02 smhood

@dirrao do I have to change labels in order to get follow up?

smhood avatar Feb 02 '24 20:02 smhood

I am experiencing similar problem in 2.7.3

[2024-02-04T07:19:20.201+0000] {scheduler_job_runner.py:1081} DEBUG - Executor full, skipping critical section [2024-02-04T07:19:20.203+0000] {base_executor.py:217} DEBUG - 32 running task instances [2024-02-04T07:19:20.203+0000] {base_executor.py:218} DEBUG - 0 in queue [2024-02-04T07:19:20.203+0000] {base_executor.py:219} DEBUG - 0 open slots

Running two scheduler replica and getting a lot of msgs like scheduling was skipped, probably because the DAG record was locked

I manually killed one of the scheduler pod and it helped to evaluate the issue. My scheduler were not restarted from last 5 days.

airflow-scheduler-686459bbff-g68xj                                                 2/2     Running             0              19m
airflow-scheduler-686459bbff-gv9vc                                                 2/2     Running             0              5d11h

Airflow config

[core]
dags_folder = /opt/airflow/dags
hostname_callable = airflow.utils.net.getfqdn
might_contain_dag_callable = airflow.utils.file.might_contain_dag_via_default_heuristic
default_timezone = utc
executor = KubernetesExecutor
auth_manager = airflow.auth.managers.fab.fab_auth_manager.FabAuthManager
parallelism = 32
max_active_tasks_per_dag = 16
dags_are_paused_at_creation = True
max_active_runs_per_dag = 16
# mp_start_method =
load_examples = false
plugins_folder = /opt/airflow/plugins
execute_tasks_new_python_interpreter = False
fernet_key = xxxxxxx=
donot_pickle = True
dagbag_import_timeout = 30.0
dagbag_import_error_tracebacks = True
dagbag_import_error_traceback_depth = 2
dag_file_processor_timeout = 50
task_runner = StandardTaskRunner
default_impersonation =
security =
unit_test_mode = False
enable_xcom_pickling = False
allowed_deserialization_classes = airflow\..*
killed_task_cleanup_time = 60
dag_run_conf_overrides_params = True
dag_discovery_safe_mode = True
dag_ignore_file_syntax = regexp
default_task_retries = 0
default_task_retry_delay = 300
max_task_retry_delay = 86400
default_task_weight_rule = downstream
default_task_execution_timeout =
min_serialized_dag_update_interval = 30
compress_serialized_dags = False
min_serialized_dag_fetch_interval = 10
max_num_rendered_ti_fields_per_task = 30
check_slas = True
xcom_backend = airflow.models.xcom.BaseXCom
lazy_load_plugins = True
lazy_discover_providers = True
hide_sensitive_var_conn_fields = True
sensitive_var_conn_names =
default_pool_task_slot_count = 512
max_map_length = 1024
daemon_umask = 0o077
# dataset_manager_class =
# dataset_manager_kwargs =
database_access_isolation = False
# internal_api_url =
test_connection = Disabled
colored_console_log = True
remote_logging = True
[database]
alembic_ini_file_path = alembic.ini
sql_alchemy_conn = postgresql+psycopg2://.....
# sql_alchemy_engine_args =
sql_engine_encoding = utf-8
# sql_engine_collation_for_ids =
sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 5
sql_alchemy_max_overflow = 10
sql_alchemy_pool_recycle = 1800
sql_alchemy_pool_pre_ping = True
sql_alchemy_schema =
# sql_alchemy_connect_args =
load_default_connections = True
max_db_retries = 3
check_migrations = True
[logging]
base_log_folder = /opt/airflow/logs
remote_logging = True
remote_log_conn_id = aws_conn_s3
delete_local_logs = False
google_key_path =
remote_base_log_folder = s3://s3-paas/airflow-prod-logs/
remote_task_handler_kwargs =
encrypt_s3_logs = False
logging_level = DEBUG
celery_logging_level =
fab_logging_level = WARNING
logging_config_class =
colored_console_log = True
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
dag_processor_log_target = file
dag_processor_log_format = [%%(asctime)s] [SOURCE:DAG_PROCESSOR] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
log_formatter_class = airflow.utils.log.timezone_aware.TimezoneAware
secret_mask_adapter =
task_log_prefix_template =
log_filename_template = dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{%% if ti.map_index >= 0 %%}map_index={{ ti.map_index }}/{%% endif %%}attempt={{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
task_log_reader = task
extra_logger_names =
worker_log_server_port = 8793
trigger_log_server_port = 8794
# interleave_timestamp_parser =
file_task_handler_new_folder_permissions = 0o775
file_task_handler_new_file_permissions = 0o664
celery_stdout_stderr_separation = False
delete_worker_pods = False
[metrics]
metrics_allow_list =
metrics_block_list =
statsd_on = True
statsd_host = airflow-statsd
statsd_port = 9125
statsd_prefix = airflow
stat_name_handler =
statsd_datadog_enabled = False
statsd_datadog_tags =
statsd_datadog_metrics_tags = True
# statsd_custom_client_path =
statsd_disabled_tags = job_id,run_id
statsd_influxdb_enabled = False
otel_on = False
otel_host = localhost
otel_port = 8889
otel_prefix = airflow
otel_interval_milliseconds = 60000
otel_debugging_on = False
otel_ssl_active = False
[secrets]
backend =
backend_kwargs =
use_cache = False
cache_ttl_seconds = 900
[cli]
api_client = airflow.api.client.local_client
endpoint_url = http://localhost:8080
[debug]
fail_fast = False
[api]
enable_experimental_api = False
auth_backends = airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session
maximum_page_limit = 100
fallback_page_limit = 100
google_oauth2_audience =
google_key_path =
access_control_allow_headers =
access_control_allow_methods =
access_control_allow_origins =
enable_xcom_deserialize_support = False
[lineage]
backend =
[operators]
default_owner = airflow
default_deferrable = false
default_cpus = 1
default_ram = 512
default_disk = 512
default_gpus = 0
default_queue = default
allow_illegal_arguments = False
[webserver]
access_denied_message = Access is Denied
config_file = /opt/airflow/webserver_config.py
base_url = http://localhost:8080
default_ui_timezone = UTC
web_server_host = 0.0.0.0
web_server_port = 8080
web_server_ssl_cert =
web_server_ssl_key =
session_backend = database
web_server_master_timeout = 120
web_server_worker_timeout = 120
worker_refresh_batch_size = 1
worker_refresh_interval = 6000
reload_on_plugin_change = False
secret_key =xxxxxxxx
workers = 4
worker_class = sync
access_logfile = -
error_logfile = -
access_logformat =
expose_config = False
expose_hostname = False
expose_stacktrace = False
dag_default_view = grid
dag_orientation = LR
grid_view_sorting_order = topological
log_fetch_timeout_sec = 5
log_fetch_delay_sec = 2
log_auto_tailing_offset = 30
log_animation_speed = 1000
hide_paused_dags_by_default = False
page_size = 100
navbar_color = #fff
default_dag_run_display_number = 25
enable_proxy_fix = True
proxy_fix_x_for = 1
proxy_fix_x_proto = 1
proxy_fix_x_host = 1
proxy_fix_x_port = 1
proxy_fix_x_prefix = 1
cookie_secure = False
cookie_samesite = Lax
default_wrap = False
x_frame_enabled = True
# analytics_tool =
# analytics_id =
show_recent_stats_for_completed_runs = True
update_fab_perms = True
session_lifetime_minutes = 43200
# instance_name =
instance_name_has_markup = False
auto_refresh_interval = 3
warn_deployment_exposure = True
audit_view_excluded_events = gantt,landing_times,tries,duration,calendar,graph,grid,tree,tree_data
# audit_view_included_events =
enable_swagger_ui = True
run_internal_api = False
auth_rate_limited = True
auth_rate_limit = 5 per 40 second
caching_hash_method = md5
show_trigger_form_if_no_params = False
rbac = True
[email]
email_backend = airflow.utils.email.send_email_smtp
email_conn_id = smtp_default
default_email_on_retry = True
default_email_on_failure = True
# subject_template =
# html_content_template =
# from_email =
ssl_context = default
[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = true
smtp_ssl = true
smtp_user = [email protected]
smtp_password = [email protected]
smtp_port = 587
smtp_mail_from = [email protected]
smtp_timeout = 30
smtp_retry_limit = 5
[sentry]
sentry_on = false
sentry_dsn =
# before_send =
[scheduler]
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5
local_task_job_heartbeat_sec = 0
num_runs = -1
scheduler_idle_sleep_time = 1
min_file_process_interval = 30
parsing_cleanup_interval = 60
stale_dag_threshold = 50
dag_dir_list_interval = 30
print_stats_interval = 30
pool_metrics_interval = 5.0
scheduler_health_check_threshold = 30
enable_health_check = False
scheduler_health_check_server_port = 8974
orphaned_tasks_check_interval = 300.0
child_process_log_directory = /opt/airflow/logs/scheduler
scheduler_zombie_task_threshold = 300
zombie_detection_interval = 10.0
catchup_by_default = True
ignore_first_depends_on_past_by_default = True
max_tis_per_query = 16
use_row_level_locking = True
max_dagruns_to_create_per_loop = 10
max_dagruns_per_loop_to_schedule = 20
schedule_after_task_execution = True
parsing_pre_import_modules = True
parsing_processes = 2
file_parsing_sort_mode = modified_time
standalone_dag_processor = False
max_callbacks_per_loop = 20
dag_stale_not_seen_duration = 600
use_job_schedule = True
allow_trigger_in_future = False
trigger_timeout_check_interval = 15
task_queued_timeout = 600.0
task_queued_timeout_check_interval = 120.0
allowed_run_id_pattern = ^[A-Za-z0-9_.~:+-]+$
logging_level = DEBUG
run_duration = 41460
statsd_host = airflow-statsd
statsd_on = True
statsd_port = 9125
statsd_prefix = airflow
[triggerer]
default_capacity = 1000
job_heartbeat_sec = 5
triggerer_health_check_threshold = 30
[sensors]
default_timeout = 604800
[aws]
# session_factory =
cloudwatch_task_handler_json_serializer = airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize_legacy
[aws_ecs_executor]
conn_id = aws_default
# region_name =
assign_public_ip = False
# cluster =
# container_name =
launch_type = FARGATE
platform_version = LATEST
# security_groups =
# subnets =
# task_definition =
max_run_task_attempts = 3
# run_task_kwargs =
[celery_kubernetes_executor]
kubernetes_queue = kubernetes
[celery]
celery_app_name = airflow.providers.celery.executors.celery_executor
worker_concurrency = 16
# worker_autoscale =
worker_prefetch_multiplier = 1
worker_enable_remote_control = true
broker_url = redis://redis:6379/0
# result_backend =
result_backend_sqlalchemy_engine_options =
flower_host = 0.0.0.0
flower_url_prefix = /
flower_port = 5555
flower_basic_auth =
sync_parallelism = 0
celery_config_options = airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG
ssl_active = False
ssl_key =
ssl_cert =
ssl_cacert =
pool = prefork
operation_timeout = 1.0
task_track_started = True
task_publish_max_retries = 3
worker_precheck = False
[celery_broker_transport_options]
# visibility_timeout =
# sentinel_kwargs =
[local_kubernetes_executor]
kubernetes_queue = kubernetes
[kubernetes_executor]
api_client_retry_configuration =
logs_task_metadata = False
pod_template_file = /opt/airflow/pod_templates/pod_template_file.yaml
worker_container_repository = xyz.com/airflow-prod
worker_container_tag = a6a136ee
namespace = airflow
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1
multi_namespace_mode = False
multi_namespace_mode_namespace_list =
in_cluster = True
# cluster_context =
# config_file =
kube_client_request_args =
delete_option_kwargs =
enable_tcp_keepalive = True
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6
verify_ssl = True
worker_pods_queued_check_interval = 60
ssl_ca_cert =
[dask]
cluster_address = 127.0.0.1:8786
tls_ca =
tls_cert =
tls_key =

[azure_remote_logging]
remote_wasb_log_container = airflow-logs
[kubernetes]
airflow_configmap = airflow-airflow-config
airflow_local_settings_configmap = airflow-airflow-config
multi_namespace_mode = False
namespace = airflow
pod_template_file = /opt/airflow/pod_templates/pod_template_file.yaml
worker_container_repository = xyz.com/airflow-prod
worker_container_tag = a6a136ee

aki263 avatar Feb 04 '24 07:02 aki263

image

Able to capture a pretty good log of what we are seeing. Things were working fine yesterday between 09:00 - 17:00, then 18:00 comes along and we start seeing the execution lock issue. No deployments happened at this time, same Dags were running and no error logs as far as we can tell just yet.

smhood avatar Feb 06 '24 14:02 smhood

After a couple of weeks with no responses on this post we just decided to revert back to 2.7.2 and the issue is gone. Down the road we will investigate being able to use 2.8.x on something like astronomer, but can definitely confirm that the change to 2.7 -> 2.8 caused this issue.

smhood avatar Feb 19 '24 13:02 smhood

It looks like the scheduler or the kubernetes_executor cannot recover from communication issues with kubernetes. I've collected a few hours of logging after a restart of the scheduler and the problems seem to occur after following lines:

[2024-02-22T10:24:59.431+0000] {kubernetes_executor_utils.py:121} ERROR - Unknown error in KubernetesJobWatcher. Failing
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 710, in _error_catcher
    yield
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 1073, in read_chunked
    self._update_chunk_length()
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 1008, in _update_chunk_length
    raise InvalidChunkLength(self, line) from None
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 112, in run
    self.resource_version = self._run(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 168, in _run
    for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs):
  File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/watch/watch.py", line 165, in stream
    for line in iter_resp_lines(resp):
  File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
    for seg in resp.stream(amt=None, decode_content=False):
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 933, in stream
    yield from self.read_chunked(amt, decode_content=decode_content)
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 1061, in read_chunked
    with self._error_catcher():
  File "/usr/local/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 727, in _error_catcher
    raise ProtocolError(f"Connection broken: {e!r}", e) from e
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
Process KubernetesJobWatcher-3:
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 710, in _error_catcher
    yield
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 1073, in read_chunked
    self._update_chunk_length()
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 1008, in _update_chunk_length
    raise InvalidChunkLength(self, line) from None
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 112, in run
    self.resource_version = self._run(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 168, in _run
    for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs):
  File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/watch/watch.py", line 165, in stream
    for line in iter_resp_lines(resp):
  File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
    for seg in resp.stream(amt=None, decode_content=False):
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 933, in stream
    yield from self.read_chunked(amt, decode_content=decode_content)
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 1061, in read_chunked
    with self._error_catcher():
  File "/usr/local/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 727, in _error_catcher
    raise ProtocolError(f"Connection broken: {e!r}", e) from e
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
[2024-02-22T10:24:59.568+0000] {kubernetes_executor_utils.py:359} ERROR - Error while health checking kube watcher process for namespace airflow. Process died for unknown reasons
[2024-02-22T10:24:59.586+0000] {kubernetes_executor_utils.py:157} INFO - Event: and now my watch begins starting at resource_version: 0

After that, tasks are stuck in queued and I don't see any more lines of the kind

[2024-02-22T10:16:50.260+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey

I can only recover from that state by clearing all scheduled and queued tasks and restarting the scheduler. I wasn't able to dig deeper into the kubernetes_executor yet, but there seem to be quite a few changes between 2.7.3 and 2.8.1. That would be my first guess for the origin of this.

bixel avatar Feb 22 '24 12:02 bixel

Hi! I had the same error with 2.7.3 described in https://github.com/apache/airflow/issues/36478 and I tested it on 2.8.4 and bug still exists

crabio avatar Apr 09 '24 07:04 crabio

Update: I have the same error with 1 scheduler with Airflow 2.8.4. But I think that error may be also in the kubernetes provider. Libs:

apache-airflow == 2.8.4
dbt-core == 1.7.11
dbt-snowflake == 1.7.3
apache-airflow[statsd]
facebook-business == 19.0.2
google-ads == 22.1.0
twitter-ads == 11.0.0
acryl-datahub-airflow-plugin == 0.10.2.3
acryl-datahub[dbt] == 0.10.2.3
checksumdir
filelock
openpyxl
cronsim
apache-airflow-providers-snowflake
apache-airflow-providers-cncf-kubernetes == 8.0.1
apache-airflow-providers-apache-kafka == 1.3.1
apache-airflow-providers-slack == 8.6.1
apache-airflow-providers-amazon
kubernetes
snowplow_analytics_sdk
elementary-data == 0.14.1

crabio avatar Apr 09 '24 10:04 crabio

@smhood I have downgraded Airflow version to 2.7.2, but issue still exists...

apache-airflow == 2.7.2
dbt-core == 1.7.11
dbt-snowflake == 1.7.3
apache-airflow[statsd]
facebook-business == 19.0.2
google-ads == 22.1.0
twitter-ads == 11.0.0
acryl-datahub-airflow-plugin == 0.10.2.3
acryl-datahub[dbt] == 0.10.2.3
checksumdir
filelock
openpyxl
cronsim
apache-airflow-providers-snowflake
apache-airflow-providers-cncf-kubernetes == 8.0.1
apache-airflow-providers-apache-kafka == 1.3.1
apache-airflow-providers-slack == 8.6.1
apache-airflow-providers-amazon
kubernetes
snowplow_analytics_sdk
elementary-data == 0.14.1

Maybe issue is inside providers packages?

crabio avatar Apr 10 '24 07:04 crabio

Found emails related the same issue: https://www.mail-archive.com/[email protected]/msg309101.html

crabio avatar Apr 10 '24 07:04 crabio

I found workaround and some insights: If scheduler parallelism is less than sum of all slots in pools (32 parallelism and for example 8 pools by 8 slots) and all slots are used - scheduler starts leaking slots after some time. Workaround - set parallelism higher than pools slots sum you have

crabio avatar Apr 10 '24 08:04 crabio

@crabio , Were you able to find a solution ? We are also facing the task leak issue in v2.6.3

paramjeet01 avatar Apr 16 '24 10:04 paramjeet01

@paramjeet01 Fully - not We still have slots leak: image

But we found walkaround:

  1. set AIRFLOW__CORE__PARALLELISM to 512+ to have free slots in the scheduler. Or bigger than sum of slots in all pools that you have
  2. try set AIRFLOW__SCHEDULER__NUM_RUNS to reset scheduler some times. We are going to try this option with 4 hour scheduler life time
  3. check that you haven't zombie pods in all namespaces that you are using for the Airflow. We have some namespaces, where, we think, Airflow Scheduler has no access to kill pods and we have zombie succeeded tasks

crabio avatar Apr 16 '24 10:04 crabio

@crabio I have updated my comments here https://github.com/apache/airflow/issues/38968#issuecomment-2059521327 , I was able to improve the performance and the task no longer have longer queue duration

paramjeet01 avatar Apr 16 '24 16:04 paramjeet01

I see similarity between the issue we are facing and the one you describe.

Airflow 2.8.4 We run one instance of scheduler and we observe a list of completed tasks (usually cleared) like here: https://github.com/apache/airflow/issues/33402), image

Config:

  • core parallelism = 32
  • default pool slots = 128
Screenshot 2024-04-15 at 15 26 19

At 3 AM there were supposed to run more tasks but nothing like that happened.

In the morning we ended up having no running tasks, many in queued state

aru-trackunit avatar Apr 17 '24 12:04 aru-trackunit

@crabio Could you please post steps to reproduce the issue? Then I could spend a little bit more time understanding it

aru-trackunit avatar Apr 17 '24 15:04 aru-trackunit

@aru-trackunit Sure

  1. run Airflow with Kubernetes Executor with 1 scheduler
  2. run some tasks in your default namespace (maybe it is not required) and run tasks in another namespace with access for Airflow for create, watch and delete pods
  3. run airflow with active tasks for some time. Need to have running tasks for some time to catch slots leak.

Maybe tasks in another namespace not required, because we faced this issue before we started using multiple namespaces. I think @paramjeet01 and @smhood has 1 namespace

crabio avatar Apr 17 '24 19:04 crabio

@crabio , Yes we run in single namespace.

paramjeet01 avatar Apr 17 '24 19:04 paramjeet01

I am seeing issue in single namespace. Scheduler fails to remove pods in Completed state and after running slot reaches 32 new scheduled task aren't getting queued. airflow==2.8.4 apache-airflow-providers-cncf-kubernetes==7.14.0 kubernetes==23.6.0 urllib3==2.0.7

KubernetesJobWatcher failed a couple times but it was able to restart. In the logs below, the Watcher running on PID: 2034 failed. On the next sync of the executor, it was able to start back with PID: 3740.

[2024-04-18T23:29:34.285+0000] [2034:139691425343296] {airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:121} ERROR - Unknown error in KubernetesJobWatcher. Failing
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 710, in _error_catcher
    yield
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 1073, in read_chunked
    self._update_chunk_length()
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 1008, in _update_chunk_length
    raise InvalidChunkLength(self, line) from None
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 112, in run
    self.resource_version = self._run(
                            ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 168, in _run
    for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs):
  File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 165, in stream
    for line in iter_resp_lines(resp):
  File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
    for seg in resp.stream(amt=None, decode_content=False):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 933, in stream
    yield from self.read_chunked(amt, decode_content=decode_content)
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 1061, in read_chunked
    with self._error_catcher():
  File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 727, in _error_catcher
    raise ProtocolError(f"Connection broken: {e!r}", e) from e
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
[2024-04-18T23:29:35.067+0000] [ 7:139691425343296] {airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:449} DEBUG - Syncing KubernetesExecutor
[2024-04-18T23:29:35.067+0000] [ 7:139691425343296] {airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:359} ERROR - Error while health checking kube watcher process for namespace astronomer-contractual-po-prod. Process died for unknown reasons
[2024-04-18T23:29:35.078+0000] [3740:139691425343296] {airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:157} INFO - Event: and now my watch begins starting at resource_version: 0

I was working fine for a minute; it was reporting back the pod changes.

[2024-04-18T23:30:50.605+0000] [3740:139691425343296] {airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:170} DEBUG - Event: strategizer-monitor-check-strategizer-bid-statuses-lz2ww1cv had an event of type DELETED
[2024-04-18T23:30:50.605+0000] [3740:139691425343296] {airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:256} INFO - Skipping event for Succeeded pod strategizer-monitor-check-strategizer-bid-statuses-lz2ww1cv - event for this pod already sent to executor

After this the Watcher went silent, no logs with PID 3740. KubernetesExecuter.running set kept increasing:

[2024-04-18T23:40:01.059+0000] [ 7:139691425343296] {airflow/executors/base_executor.py:217} DEBUG - 1 running task instances
[2024-04-18T23:40:01.060+0000] [ 7:139691425343296] {airflow/executors/base_executor.py:218} DEBUG - 0 in queue
[2024-04-18T23:40:01.060+0000] [ 7:139691425343296] {airflow/executors/base_executor.py:219} DEBUG - 31 open slots
...
[2024-04-19T13:24:44.721+0000] [ 7:139691425343296] {airflow/executors/base_executor.py:217} DEBUG - 32 running task instances
[2024-04-19T13:24:44.721+0000] [ 7:139691425343296] {airflow/executors/base_executor.py:218} DEBUG - 0 in queue
[2024-04-19T13:24:44.721+0000] [ 7:139691425343296] {airflow/executors/base_executor.py:219} DEBUG - 0 open slots

I confirmed that the PID: 3740 is still running.

/usr/local/airflow$ ps -ef
UID   PID PPID  C STIME     TIME CMD
astro    1    0  0 Apr18 00:00:00 tini -- /entrypoint bash -c exec airflow scheduler
astro    7    1  4 Apr18 00:37:17 /usr/local/bin/python /usr/local/bin/airflow scheduler
astro   24    7  0 Apr18 00:01:06 /usr/local/bin/python /usr/local/bin/airflow scheduler
astro   33    7  0 Apr18 00:00:41 /usr/local/bin/python /usr/local/bin/airflow scheduler
astro   44    7  1 Apr18 00:12:28 airflow scheduler -- DagFileProcessorManager
astro 3740    7  0 Apr18 00:00:00 /usr/local/bin/python /usr/local/bin/airflow scheduler  <=== KubernetesJobWatcher

KubernetesJobWatcher is stuck in generator self._pod_events(kube_client=kube_client, query_kwargs=kwargs)

https://github.com/apache/airflow/blob/cead3da4a6f483fa626b81efd27a24dcb5a36ab0/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py#L168

waiting for stream from kubernetes Watch.stream() and urllib3 HttpResponse.stream()

https://github.com/kubernetes-client/python/blob/98cd2251152fcdbfa6de24c85384887b0999a94c/kubernetes/base/watch/watch.py#L56

https://github.com/urllib3/urllib3/blob/56f01e088dc006c03d4ee6ea9da4ab810f1ed700/src/urllib3/response.py#L914

karunpoudel-chr avatar Apr 19 '24 15:04 karunpoudel-chr