airflow
airflow copied to clipboard
could not queue task - airflow 2.3.4 - kubernetes executor
Apache Airflow version
2.3.4
What happened
Tasks are no longer scheduled since upgrading to airflow.2.3.4, with logs:
{base_executor.py:211} INFO - task TaskInstanceKey(dag_id='sys_liveness', task_id='liveness', run_id='scheduled__2022-09-14T12:15:00+00:00', try_number=1, map_index=-1) is still running
{base_executor.py:215} ERROR - could not queue task TaskInstanceKey(dag_id='sys_liveness', task_id='liveness', run_id='scheduled__2022-09-14T12:15:00+00:00', try_number=1, map_index=-1) (still running after 4 attempts)
Even with only one dag running in a simple deployment, scheduled every 5 minutes, and enough slot in the default pool.
What you think should happen instead
Tasks should be executed.
How to reproduce
No response
Operating System
Debian GNU/Linux
Versions of Apache Airflow Providers
apache-airflow-providers-amazon = "4.1.0"
apache-airflow-providers-cncf-kubernetes = "4.0.2"
apache-airflow-providers-http = "2.1.2"
apache-airflow-providers-mysql = "2.2.3"
apache-airflow-providers-postgres = "4.1.0"
apache-airflow-providers-ssh = "2.4.4"
apache-airflow-providers-sqlite = "2.1.3"
Deployment
Official Apache Airflow Helm Chart
Deployment details
kubernetes executor
[core]
hostname_callable = socket.getfqdn
default_timezone = utc
executor = KubernetesExecutor
parallelism = 512
max_active_tasks_per_dag = 128
dags_are_paused_at_creation = False
max_active_runs_per_dag = 1
load_examples = False
plugins_folder = /home/airflow/plugins
execute_tasks_new_python_interpreter = False
donot_pickle = False
dagbag_import_timeout = 30.0
dagbag_import_error_tracebacks = True
dagbag_import_error_traceback_depth = 2
dag_file_processor_timeout = 300
task_runner = StandardTaskRunner
default_impersonation =
security =
unit_test_mode = False
enable_xcom_pickling = False
killed_task_cleanup_time = 60
dag_run_conf_overrides_params = True
dag_discovery_safe_mode = True
dag_ignore_file_syntax = regexp
default_task_retries = 0
default_task_weight_rule = downstream
default_task_execution_timeout =
min_serialized_dag_update_interval = 30
compress_serialized_dags = False
min_serialized_dag_fetch_interval = 30
max_num_rendered_ti_fields_per_task = 30
check_slas = True
xcom_backend = airflow.models.xcom.BaseXCom
lazy_load_plugins = True
lazy_discover_providers = True
hide_sensitive_var_conn_fields = True
sensitive_var_conn_names =
default_pool_task_slot_count = 128
max_map_length = 1024
daemon_umask = 0o077
secure_mode = False
dag_concurrency = 256
non_pooled_task_slot_count = 512
sql_alchemy_schema = airflow
[scheduler]
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5
num_runs = -1
scheduler_idle_sleep_time = 1
min_file_process_interval = 400
deactivate_stale_dags_interval = 60
dag_dir_list_interval = 300
print_stats_interval = 30
pool_metrics_interval = 5.0
scheduler_health_check_threshold = 30
orphaned_tasks_check_interval = 300.0
child_process_log_directory = /home/airflow/logs/scheduler
scheduler_zombie_task_threshold = 300
zombie_detection_interval = 10.0
catchup_by_default = True
ignore_first_depends_on_past_by_default = True
max_tis_per_query = 512
use_row_level_locking = True
max_dagruns_to_create_per_loop = 10
max_dagruns_per_loop_to_schedule = 20
schedule_after_task_execution = True
parsing_processes = 4
file_parsing_sort_mode = modified_time
standalone_dag_processor = False
max_callbacks_per_loop = 20
use_job_schedule = True
allow_trigger_in_future = False
dependency_detector = airflow.serialization.serialized_objects.DependencyDetector
trigger_timeout_check_interval = 15
max_threads = 5
Anything else
When we restart the scheduler, any queued tasks are executed. But the next task after isn't and is stucked in queued again.
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Hey! I'm facing the same issue and I have a similar configuration. I'm going to share what I tested so far, but first a bit of background:
- I had a perfectly functional 2.2.3 on Kubernetes[EKS 1.21] (installed with the bitnami/airflow Helm Chart).
- I upgraded straight to the 2.3.4 doing a
airflow db upgrade
and everything was fine (for the first 10 minutes). - Tasks won't schedule with an unrecognizable pattern (some of them would run manually but when the time comes they won't start or the other way round. Totally random, no clue).
This is what I tested so far, without much luck, anyway.
- I thought it was something wrong with the metadata store (PostgreSQL in my case) so I cleaned some DAGs history deleting and re-importing them but without success.
- Rebooting the scheduler would (sometimes) restart the queued process, but it would eventually start to stall eventually.
- I created a new namespace and replicated a fresh installation of a 2.3.4 I tested some DAGs and the problem is still there.
I think there is something wrong with this version and i will rollback to a 2.2.5 tomorrow, because i have some production workload that i would like to keep going.
If i have some info on this, i would reply here.
Hi eveyone,
I am facing the same issue, basically after updating to 2.3.4 some of mine scheduled tasks would not run i.e. the DAG run itself would be labeled as running, but the first task in the DAG would just get set to queued and would never run, the scheduler would report the same
INFO - task TaskInstanceKey(dag_id='...', task_id='...', run_id='scheduled__2022-09-16T12:49:00+00:00', try_number=1, map_index=-1) is still running
scheduler [2022-09-16T12:58:17.664+0000] {base_executor.py:215} ERROR - could not queue task TaskInstanceKey(dag_id='....', task_id='...', run_id='scheduled__2022-09-16T12:47:00+00:00', try_number=1, map_index=-1) (still running after 4 attempts)
Runing the task manually would succeed, but scheduled tasks would not run, if I drop the dag_runs from db, then the first scheduled dag run would succeed but all after it would halt as described above.
Update:
The downgrade was unsuccessful (to 2.2.5) so i performed a airflow db erase
to ease out some scheduler pain over a misaligned schema in the metadata store. But the fresh 2.2.5 performed just fine (so far). I think i'm going to leave the things as they are right now and just following what the outcomes are on the other issues on the same topic.
You can also try 2.4.0 releaed today. There are many bug fixes and while we cannot pin-point the problem to a specific fix, several similar issues were fixed there.
Not sure if totally related to this, but I'm also getting some weird behavior with the k8s executor and pod_overwrites. See details here. Then when I'm clearing the task from the UI I'm getting errors similar to the one posted above...
Finally found the same issues as ours! Tried 2.4.0, doesn't help, tasks still stuck in queued with the same behaviour. Downgrade to 2.3.3 fixes the issue.
Also with cncf-kubernetes 4.3.0, same problem for any version >2.3.3
This is impacting only one of our DAGs (we're on version 2.3.4), but it isn't clear why it is happening.
Please see if you can create a minimal reproduce example otherwise its hard to find the root cause
Hello @eladkal,
I work with @bertrand-buffat, I allow myself to add certain elements in order to be as precise as possible on the current problem.
Cluster version kubernetes : 1.21 AWS platform version: eks.10 (https://docs.aws.amazon.com/eks/latest/userguide/platform-versions.html#:~:text=April%204%2C%202022-,Kubernetes%20version%201.21,-The%20following%20admission)
(example of dag that works on V2.2.2 and fail on V2.4.0)
Thanks for your help 🙂
from kubernetes.client import models as k8s
import copy
from airflow.models import DAG
class DataKubeDAG(DAG):
def __init__(self, dag_id, default_args, tags=[], **kwargs):
self.log.info(f"Loading dag: {dag_id}")
full_tags += [default_args["owner"]]
full_tags += tags
super().__init__(dag_id=dag_id, default_args=default_args, tags=full_tags, **kwargs)
def add_task(self, task):
custom_annotations = k8s.V1ObjectMeta(
annotations={"ad.datadoghq.com/tags": f'{{"airflow_dag_id": "{self.dag_id}", "airflow_task_id": "{task.task_id}"}}'}
)
if task.executor_config.get("pod_override") is None:
task.executor_config = {"pod_override": k8s.V1Pod()}
task.executor_config["pod_override"].metadata = custom_annotations
super().add_task(task)
return
from kubernetes.client import models as k8s
from datetime import timedelta
from airflow.utils.dates import days_ago
def obtain_default_args(owner, retries: int = 6):
return {
"owner": "me",
"depends_on_past": False,
"email": "[email protected]",
"email_on_failure": False,
"email_on_retry": False,
"retries": retries,
"retry_delay": timedelta(minutes=5),
"start_date": days_ago(1),
"catchup": False,
"execution_timeout": timedelta(seconds=86400),
"priority_weight": 1,
"executor_config": {"pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={}))},
}
from airflow.operators.dummy_operator import DummyOperator
from functions.default_args import obtain_default_args
from plugins.models.airflow import DataKubeDAG
default_args = obtain_default_args()
default_args["retries"] = 0
with DataKubeDAG(
dag_id="sys_liveness",
schedule_interval="*/5 * * * *",
default_args=default_args,
catchup=False,
) as dag:
start_task = DummyOperator(task_id="liveness")
Can you please all try 2.4.1 (released yesterday with latest cncf.kubernetes) and see if your problems are fixed? I believe there were a few errors fixed there regarding scheduling being stuck so it is actually quite likely your issues have been fixed.
We have no logs to "prove" this is the case, but I think trying it out is the easiest way for everyone to check without spending too much time on trying to do "guesswork". @ajbosco @lucas481516 @george-zubrienko
If the issue still persist, I am afraid we will need some more complete logs from scheduler and kubernetes pods from around the time it happens - otherwise it will be super-difficult to guess the reason for.
I will try it this week around Wednesday. Was on vacation :)
got a bit delayed, will test tomorrow and come back with results. Sorry.
@potiuk We no longer observe the issue on the following setup:
Kubernetes Executor (1.22.11) / Airflow 2.4.1-python3.9 / Helm Chart 1.7.0 with dag processor enabled.
Cool closing it as non reproducible.