airflow icon indicating copy to clipboard operation
airflow copied to clipboard

could not queue task - airflow 2.3.4 - kubernetes executor

Open bertrand-buffat opened this issue 1 year ago • 6 comments

Apache Airflow version

2.3.4

What happened

Tasks are no longer scheduled since upgrading to airflow.2.3.4, with logs:

{base_executor.py:211} INFO - task TaskInstanceKey(dag_id='sys_liveness', task_id='liveness', run_id='scheduled__2022-09-14T12:15:00+00:00', try_number=1, map_index=-1) is still running
{base_executor.py:215} ERROR - could not queue task TaskInstanceKey(dag_id='sys_liveness', task_id='liveness', run_id='scheduled__2022-09-14T12:15:00+00:00', try_number=1, map_index=-1) (still running after 4 attempts)

Even with only one dag running in a simple deployment, scheduled every 5 minutes, and enough slot in the default pool.

What you think should happen instead

Tasks should be executed.

How to reproduce

No response

Operating System

Debian GNU/Linux

Versions of Apache Airflow Providers

apache-airflow-providers-amazon = "4.1.0"
apache-airflow-providers-cncf-kubernetes = "4.0.2"
apache-airflow-providers-http = "2.1.2"
apache-airflow-providers-mysql = "2.2.3"
apache-airflow-providers-postgres = "4.1.0"
apache-airflow-providers-ssh = "2.4.4"
apache-airflow-providers-sqlite = "2.1.3"

Deployment

Official Apache Airflow Helm Chart

Deployment details

kubernetes executor

[core]
hostname_callable = socket.getfqdn
default_timezone = utc
executor = KubernetesExecutor
parallelism = 512
max_active_tasks_per_dag = 128
dags_are_paused_at_creation = False
max_active_runs_per_dag = 1
load_examples = False
plugins_folder = /home/airflow/plugins
execute_tasks_new_python_interpreter = False
donot_pickle = False
dagbag_import_timeout = 30.0
dagbag_import_error_tracebacks = True
dagbag_import_error_traceback_depth = 2
dag_file_processor_timeout = 300
task_runner = StandardTaskRunner
default_impersonation =
security =
unit_test_mode = False
enable_xcom_pickling = False
killed_task_cleanup_time = 60
dag_run_conf_overrides_params = True
dag_discovery_safe_mode = True
dag_ignore_file_syntax = regexp
default_task_retries = 0
default_task_weight_rule = downstream
default_task_execution_timeout =
min_serialized_dag_update_interval = 30
compress_serialized_dags = False
min_serialized_dag_fetch_interval = 30
max_num_rendered_ti_fields_per_task = 30
check_slas = True
xcom_backend = airflow.models.xcom.BaseXCom
lazy_load_plugins = True
lazy_discover_providers = True
hide_sensitive_var_conn_fields = True
sensitive_var_conn_names =
default_pool_task_slot_count = 128
max_map_length = 1024
daemon_umask = 0o077
secure_mode = False
dag_concurrency = 256
non_pooled_task_slot_count = 512
sql_alchemy_schema = airflow

[scheduler]
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5
num_runs = -1
scheduler_idle_sleep_time = 1
min_file_process_interval = 400
deactivate_stale_dags_interval = 60
dag_dir_list_interval = 300
print_stats_interval = 30
pool_metrics_interval = 5.0
scheduler_health_check_threshold = 30
orphaned_tasks_check_interval = 300.0
child_process_log_directory = /home/airflow/logs/scheduler
scheduler_zombie_task_threshold = 300
zombie_detection_interval = 10.0
catchup_by_default = True
ignore_first_depends_on_past_by_default = True
max_tis_per_query = 512
use_row_level_locking = True
max_dagruns_to_create_per_loop = 10
max_dagruns_per_loop_to_schedule = 20
schedule_after_task_execution = True
parsing_processes = 4
file_parsing_sort_mode = modified_time
standalone_dag_processor = False
max_callbacks_per_loop = 20
use_job_schedule = True
allow_trigger_in_future = False
dependency_detector = airflow.serialization.serialized_objects.DependencyDetector
trigger_timeout_check_interval = 15
max_threads = 5

Anything else

When we restart the scheduler, any queued tasks are executed. But the next task after isn't and is stucked in queued again.

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

bertrand-buffat avatar Sep 14 '22 13:09 bertrand-buffat

Hey! I'm facing the same issue and I have a similar configuration. I'm going to share what I tested so far, but first a bit of background:

  • I had a perfectly functional 2.2.3 on Kubernetes[EKS 1.21] (installed with the bitnami/airflow Helm Chart).
  • I upgraded straight to the 2.3.4 doing a airflow db upgrade and everything was fine (for the first 10 minutes).
  • Tasks won't schedule with an unrecognizable pattern (some of them would run manually but when the time comes they won't start or the other way round. Totally random, no clue).

This is what I tested so far, without much luck, anyway.

  • I thought it was something wrong with the metadata store (PostgreSQL in my case) so I cleaned some DAGs history deleting and re-importing them but without success.
  • Rebooting the scheduler would (sometimes) restart the queued process, but it would eventually start to stall eventually.
  • I created a new namespace and replicated a fresh installation of a 2.3.4 I tested some DAGs and the problem is still there.

I think there is something wrong with this version and i will rollback to a 2.2.5 tomorrow, because i have some production workload that i would like to keep going.

If i have some info on this, i would reply here.

albertopinardi avatar Sep 15 '22 20:09 albertopinardi

Hi eveyone,

I am facing the same issue, basically after updating to 2.3.4 some of mine scheduled tasks would not run i.e. the DAG run itself would be labeled as running, but the first task in the DAG would just get set to queued and would never run, the scheduler would report the same

INFO - task TaskInstanceKey(dag_id='...', task_id='...', run_id='scheduled__2022-09-16T12:49:00+00:00', try_number=1, map_index=-1) is still running
scheduler [2022-09-16T12:58:17.664+0000] {base_executor.py:215} ERROR - could not queue task TaskInstanceKey(dag_id='....', task_id='...', run_id='scheduled__2022-09-16T12:47:00+00:00', try_number=1, map_index=-1) (still running after 4 attempts)

Runing the task manually would succeed, but scheduled tasks would not run, if I drop the dag_runs from db, then the first scheduled dag run would succeed but all after it would halt as described above.

Miskho avatar Sep 16 '22 13:09 Miskho

Update: The downgrade was unsuccessful (to 2.2.5) so i performed a airflow db erase to ease out some scheduler pain over a misaligned schema in the metadata store. But the fresh 2.2.5 performed just fine (so far). I think i'm going to leave the things as they are right now and just following what the outcomes are on the other issues on the same topic.

albertopinardi avatar Sep 16 '22 21:09 albertopinardi

You can also try 2.4.0 releaed today. There are many bug fixes and while we cannot pin-point the problem to a specific fix, several similar issues were fixed there.

potiuk avatar Sep 19 '22 11:09 potiuk

Not sure if totally related to this, but I'm also getting some weird behavior with the k8s executor and pod_overwrites. See details here. Then when I'm clearing the task from the UI I'm getting errors similar to the one posted above...

nicor88 avatar Sep 19 '22 19:09 nicor88

Finally found the same issues as ours! Tried 2.4.0, doesn't help, tasks still stuck in queued with the same behaviour. Downgrade to 2.3.3 fixes the issue.

Also with cncf-kubernetes 4.3.0, same problem for any version >2.3.3

george-zubrienko avatar Sep 19 '22 19:09 george-zubrienko

This is impacting only one of our DAGs (we're on version 2.3.4), but it isn't clear why it is happening.

ajbosco avatar Sep 23 '22 15:09 ajbosco

Please see if you can create a minimal reproduce example otherwise its hard to find the root cause

eladkal avatar Sep 26 '22 07:09 eladkal

Hello @eladkal,

I work with @bertrand-buffat, I allow myself to add certain elements in order to be as precise as possible on the current problem.

Cluster version kubernetes : 1.21 AWS platform version: eks.10 (https://docs.aws.amazon.com/eks/latest/userguide/platform-versions.html#:~:text=April%204%2C%202022-,Kubernetes%20version%201.21,-The%20following%20admission)

(example of dag that works on V2.2.2 and fail on V2.4.0)

Thanks for your help 🙂

from kubernetes.client import models as k8s
import copy

from airflow.models import DAG


class DataKubeDAG(DAG):
    def __init__(self, dag_id, default_args, tags=[], **kwargs):
        self.log.info(f"Loading dag: {dag_id}")
        full_tags += [default_args["owner"]]
        full_tags += tags
        super().__init__(dag_id=dag_id, default_args=default_args, tags=full_tags, **kwargs)

    def add_task(self, task):
        custom_annotations = k8s.V1ObjectMeta(
            annotations={"ad.datadoghq.com/tags": f'{{"airflow_dag_id": "{self.dag_id}", "airflow_task_id": "{task.task_id}"}}'}
        )
        if task.executor_config.get("pod_override") is None:
            task.executor_config = {"pod_override": k8s.V1Pod()}
        task.executor_config["pod_override"].metadata = custom_annotations
        super().add_task(task)
        return
from kubernetes.client import models as k8s
from datetime import timedelta

from airflow.utils.dates import days_ago



def obtain_default_args(owner, retries: int = 6):
    return {
        "owner": "me",
        "depends_on_past": False,
        "email": "[email protected]",
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": retries,
        "retry_delay": timedelta(minutes=5),
        "start_date": days_ago(1),
        "catchup": False,
        "execution_timeout": timedelta(seconds=86400),
        "priority_weight": 1,
        "executor_config": {"pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={}))},
    }
from airflow.operators.dummy_operator import DummyOperator

from functions.default_args import obtain_default_args
from plugins.models.airflow import DataKubeDAG

default_args = obtain_default_args()
default_args["retries"] = 0

with DataKubeDAG(
    dag_id="sys_liveness",
    schedule_interval="*/5 * * * *",
    default_args=default_args,
    catchup=False,
) as dag:
    start_task = DummyOperator(task_id="liveness")

lucas481516 avatar Sep 26 '22 18:09 lucas481516

Can you please all try 2.4.1 (released yesterday with latest cncf.kubernetes) and see if your problems are fixed? I believe there were a few errors fixed there regarding scheduling being stuck so it is actually quite likely your issues have been fixed.

We have no logs to "prove" this is the case, but I think trying it out is the easiest way for everyone to check without spending too much time on trying to do "guesswork". @ajbosco @lucas481516 @george-zubrienko

If the issue still persist, I am afraid we will need some more complete logs from scheduler and kubernetes pods from around the time it happens - otherwise it will be super-difficult to guess the reason for.

potiuk avatar Oct 02 '22 01:10 potiuk

I will try it this week around Wednesday. Was on vacation :)

george-zubrienko avatar Oct 10 '22 13:10 george-zubrienko

got a bit delayed, will test tomorrow and come back with results. Sorry.

george-zubrienko avatar Oct 18 '22 14:10 george-zubrienko

@potiuk We no longer observe the issue on the following setup:

Kubernetes Executor (1.22.11) / Airflow 2.4.1-python3.9 / Helm Chart 1.7.0 with dag processor enabled.

george-zubrienko avatar Oct 20 '22 10:10 george-zubrienko

Cool closing it as non reproducible.

eladkal avatar Oct 20 '22 10:10 eladkal