Tasks stuck in queued state
Apache Airflow version
2.2.3 (latest released)
What happened
Tasks are stuck in the queued state and will not be scheduled for execution. In the logs, these tasks have a message of could not queue task <task details>, as they are currently in the queued or running sets in the executor.
What you expected to happen
Tasks run :)
How to reproduce
We have a dockerized airflow setup, using celery with a rabbit broker & postgres as the result db. When rolling out DAG updates, we redeploy most of the components (the workers, scheduler, webserver, and rabbit). We can have a few thousand Dagruns at a given time. This error seems to happen during a load spike when a deployment happens.
Looking at the code, this is what I believe is happening:
Starting from the initial debug message of could not queue task I found tasks are marked as running (but in the UI they still appear as queued): https://github.com/apache/airflow/blob/main/airflow/executors/base_executor.py#L85
Tracking through our logs, I see these tasks are recovered by the adoption code, and the state there is STARTED (https://github.com/apache/airflow/blob/main/airflow/executors/celery_executor.py#L540).
Following the state update code, I see this does not cause any state updates to occur in Airflow (https://github.com/apache/airflow/blob/main/airflow/executors/celery_executor.py#L465). Thus, if a task is marked as STARTED in the results db, but queued in the airflow task state, it will never be transferred out by the scheduler. However ,you can get these tasks to finally run by clicking the run button.
Operating System
Ubuntu 20.04.3 LTS
Versions of Apache Airflow Providers
No response
Deployment
Other Docker-based deployment
Deployment details
No response
Anything else
I believe this could be addressed by asking celery if a task is actually running in try_adopt_task_instances. There are obvious problems like timeouts, but celery inspect active|reserved can return a json output of running and reserved tasks to verify a STARTED task is actually running
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template!
We're facing this as well after upgrading to 2.2.3 from 2.1.0
Plus one here, noticing this frequently after bump from 2.1.4 to 2.2.3
This is not ideal, but for those who want to kick all their queued tasks to running, here's a snippet i've been using:
from airflow import models, settings
from airflow.executors.executor_loader import ExecutorLoader
session = settings.Session()
tis = session.query(models.TaskInstance).filter(models.TaskInstance.state=='queued')
dagbag = models.DagBag()
for ti in tis:
dag = dagbag.get_dag(ti.dag_id)
task = dag.get_task(ti.task_id)
ti.refresh_from_task(task)
executor = ExecutorLoader.get_default_executor()
executor.job_id = "manual"
executor.start()
executor.queue_task_instance(ti, ignore_all_deps=False, ignore_task_deps=False, ignore_ti_state=False)
executor.heartbeat()
I think this is being handled in one of the fixes coming to 2.2.4 (@ephraimbuddy ?)
I'm getting the same message but different symptoms on a similar setup. CeleryKubernetesExecutor, RabbitMQ, and MySQL backend. For me, the task gets marked failed with no logs.
A sensor in reschedule mode was doing it's periodic run-sleep-run pattern. On one of the later runs, I got the could not queue task instance message. I grabbed the external_id from the logs and searched for it in Flower, and it turns out the celery task actually succeeded (that is, performed its check -- the sensor itself didn't "succeed"). However, the Airflow state was still recorded as failed.
I think some kind of desync between the states known by Airflow and the executor is causing this, but I'm not sure why the results of the successfully executed task weren't reported, even though Airflow thought it couldn't be queued (which it was).
I think this is being handled in one of the fixes coming to 2.2.4 (@ephraimbuddy ?)
Yes. Here https://github.com/apache/airflow/pull/19769
Thanks @ephraimbuddy ,
Looking at the PR, I'm not sure if it will address this problem. Notably, tasks are added to the running set by the abandoned task code, which means this line will skip over all these forever queued tasks:
https://github.com/apache/airflow/pull/19769/files#diff-ac6d6f745ae19450e4bfbd1087d865b5784294354c885136b97df437460d5f10R417
I had similar problem, it was happening, because KubernetesExecutor is picking CeleryExecutor tasks on CeleryKubernetesExecutor
Celery is changing task state to queued and KubernetesExecutor to scheduled, it is happening over and over again(depends how fast task gets to running state)
I fixed it by adding additional filter on task queue(which defaults to 'kubernetes' ) for KubernetesExecutor in couple of places, below is probably the one that is causing most of the problems: https://github.com/apache/airflow/blob/5a6a2d604979cb70c5c9d3797738f0876dd38c3b/airflow/executors/kubernetes_executor.py#L455
It is taking all of the tasks in queued state, but it should take only those that should run on Kubernetes - has queue equaled to: https://github.com/apache/airflow/blob/5a6a2d604979cb70c5c9d3797738f0876dd38c3b/airflow/config_templates/default_airflow.cfg#L743
That is why there is log entry: https://github.com/apache/airflow/blob/5a6a2d604979cb70c5c9d3797738f0876dd38c3b/airflow/executors/base_executor.py#L85
Because celery gets the same task that is already inside queued_tasks
If this is the case then you will see following messages in the logs as well: https://github.com/apache/airflow/blob/5a6a2d604979cb70c5c9d3797738f0876dd38c3b/airflow/executors/kubernetes_executor.py#L494
i'm still having this issue on 2.2.5 any updates on this ticket or when it may get fixed. In the mean time i'm using the code snippet provided by @Chris7
i'm still having this issue on 2.2.5 any updates on this ticket or when it may get fixed. In the mean time i'm using the code snippet provided by @Chris7
Some parts of this issue is marked as fixed. You likely experience other issue. But we will never know unless you open a new issue and provide all the details, your logs and circumstances that you experience the problem. Unless you do that - there is no chance your issue will be handeven looked at (and even less likely fixed, maybe accidentally) - because no-one has any idea about Your issue details.
Just be aware @meetri that you get the software for free - and a lot of the people contribute to it for free. And the best value you can bring is to spend some time on detailing your problems. providing as much as you can - including the circumstances, details and reproducibility. This way the smallest help you can provide for those who try to help people like you - on a free forum, for - again - software that you paid precisely 0 USD for.
https://github.com/apache/airflow/issues/21225#issuecomment-1035656930 @arkadiusz-bach - any workaround that helped? I tried changing queue name from default kubernetes to kubernetesQ like below but didn't help
[celery_kubernetes_executor]
kubernetes_queue = kubernetesQ
But I still see K8S executor picking up tasks from default queue(celery) which are in queued state
[[34m2022-04-18 11:10:05,800[0m] {[34mkubernetes_executor.py:[0m454} INFO[0m - Found 2 queued task instances[0m
[[34m2022-04-18 11:10:05,823[0m] {[34mkubernetes_executor.py:[0m491} INFO[0m - TaskInstance: <TaskInstance: dbr_concurrent_trigger_poll.gcorp_ccl.trigger_descision_for_source scheduled__2022-04-18T11:05:00+00:00 [queued]> found in queued state but was not launched, rescheduling[0m
I see a PR https://github.com/apache/airflow/pull/23048 to permanently fix this but it might take sometime to come w/ next stable AF release
-
Download zip with Airflow version you have from pypi
-
Check in which section inside config kubernetes_queue is defined for your Airflow version. I think that it was moved from kubernetes section to celery_kubernetes_executor, but im not sure, just check that :)
-
In kubernetes_executor.py:
- At the beginning import conf and SQLAlchemy 'and_' operator:
from airflow.configuration import conf from sqlalchemy import and_ - Inside __init__ of KubernetesExecutor class:
self.is_celery_kubernetes_executor = conf.get('core', 'executor') == 'CeleryKubernetesExecutor': self.kubernetes_queue = conf.get('celery_kubernetes_executor', 'kubernetes_queue') - Replace
https://github.com/apache/airflow/blob/5a6a2d604979cb70c5c9d3797738f0876dd38c3b/airflow/executors/kubernetes_executor.py#L455
With:
if self.is_celery_kubernetes_executor: queued_tasks = session.query(TaskInstance).filter(and_(TaskInstance.state = State.QUEUED, TaskInstance.queue == self.kubernetes_queue)).all() else: queued_tasks = session.query(TaskInstance).filter(TaskInstance.state = State.QUEUED).all()
- At the beginning import conf and SQLAlchemy 'and_' operator:
-
Replace kubernetes_executor.py file with patched one - it will be somewhere inside site-packages directory under python directories
PS: I just wrote that in notepad, so please check whether there are no syntax errors
Got it! see similar fix in PR too. so no workaround w/o code patching, which is also evident as the query(inside k8s executor) on task instances is looking only queued state and not queue(celery/k8s). Thanks
Just be aware @meetri that you get the software for free - and a lot of the people contribute to it for free. And the best value you can bring is to spend some time on detailing your problems. providing as much as you can - including the circumstances, details and reproducibility. This way the smallest help you can provide for those who try to help people like you - on a free forum, for - again - software that you paid precisely 0 USD for.
Understood, and thanks you to everyone that contributes to this project. If needed I can provide whatever information to track down this issue. I posted my comment only because I thought it would be of use for anyone passing through that even the latest version of the code has this issue and how I got around it for the time being.
This may or may not be of help but I did since my post tune things to such an extent to where I didn't need to run the script posted by Chris7, https://github.com/apache/airflow/issues/21225#issuecomment-1029060830
Firstly, I'm running this in amazon in an EKS cluster.
basically, the biggest issue was I didn't have enough memory allocated to the scheduler by a lot. I bumped it up to about 8GB minimum. This stopped a lot of the errors related to the scheduler missing heartbeats.
Secondly, i upped the number of cores at minimum to 2 per container. I am running 2 airflow scheduler replicas.
Lastly, I'm not sure how big of an impact this was probably pretty major. But given my pods autoscale up and down on demand based on a redis queue. I believe that pods were getting scaled down and the workers were being force terminated. Once i added a dumb-init to the airflow worker docker image that problem seemed to go away.
I'm only posting this if anyone is having similar issues. This may or may not be a bug in the airflow.
hello @meetri, could you please provide more info about you modification to Docker image? I also noticed that I'm getting "lost" Queued tasks when turning on Redis-based autoscaling but can't understand how some kind of dumb-init can help to avoid an issue in this situation.
Is anyone experienced this issue after 2.3.1? I would imagine that https://github.com/apache/airflow/pull/23690 should make things better... even if not fixing the issue but making sure that tasks don't get stuck in queue state.
We have been sing tasks stuck in a queued state on Airflow 2.3.2. Not many of them, but a few just get stuck in Queued and I have to go either clear them or run them.
@patricker - can you please update to 2.3.4 and see it solves your problem? You will have to do it sooner or later (and it has dozens of fixes) and might be fastest way for you (and us) to fix your problem.
Hello there 👋
We experienced the same issue in our deployment cluster, and after a lot of searching, I think I have found why it is happening as some reproductible steps to provoke it. I am very willing to submit a PR if I may.
First off, we experienced this issue very rarely on our deployment cluster. Our deployment cluster is right now on AWS EKS, Airflow version 2.3.3, with redis message broker and CeleryExecutor of course :). After a lot of searching, I found out that the queued task (or the queued tasks, I don't know how many we had because we have 1000+ dags and even more tasks), which didn't want to get executed appeared when our AWS autoscaler was doing a downscaling of our pod. At that point, when I looked at the two pods celery workers which were downscaled, I saw a a log message in the pod :
worker: Warm shutdown
this log message appeared a couple of milliseconds after the scheduler has sent the task to redis. What if, the worker had consumed this message, but exactly at the same time, celery had shutdowned the consumer loop, and the message never get properly executed. BUT, since the message has been consumed, (LPOP), redis didn't have the message in its queue, but the Celery Executor still had it ! That's why we have the log (after recleaning the task):
[2022-08-27 21:15:40,100] {base_executor.py:93} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_bash_operator', 'also_run_this', 'manual__2022-08-27T19:13:04.614822+00:00', '--local', '--subdir', '/Users/arthurvolant/dev/python/airflow/airflow/example_dags/example_bash_operator.py']
[2022-08-27 21:15:40,101] {base_executor.py:213} INFO - task TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this', run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1) is still running
[2022-08-27 21:15:40,205] {base_executor.py:213} INFO - task TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this', run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1) is still running
[2022-08-27 21:15:41,292] {base_executor.py:213} INFO - task TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this', run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1) is still running
[2022-08-27 21:15:42,391] {base_executor.py:213} INFO - task TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this', run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1) is still running
[2022-08-27 21:15:43,498] {base_executor.py:217} ERROR - could not queue task TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this', run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1) (still running after 4 attempts)
I guess, that's why there is a visibility_timeout, but unfortunately, ours is 25hours and some of our tasks cannot suffer any delay (maximum half an our). Anyway, here are the steps to reproduce it :
- Start a basic cluster / docker dev env (those steps still work on the latest commit, Airflow 2.3.4.dev)
breeze start-airflow -p 3.10 -b postgres --integration redis --db-reset - Start a dag (In my example, I am starting example_bash_operator.
- Tasks will be scheduled and put into queue by the CeleryExecutor, but not executed since not worker is running
- Connect to redis and LPOP one or more messages :
docker exec -it docker-compose_redis_1 redis-cliandLPOP default - Activate the celeryWorker.
- Boom 💥 . All the tasks which are still in queued are run, except the one(s) you popped out of the queue.
I will try to think of something. I have some ideas and submit a PR as soon as possible since this is very stressful for our deployment.
And here is a picture:

I'm using airflow 2.2.5 in cloud-composer 2.0.24 on GCP and I have noticed the same kind of error. My investigation also lead me to think that the issue happens when the autoscaler stop worker pods.
@V0lantis task might stuck in the queued state at least due to:
- Redis crashed after receiving the task and it had no time / was not configured to save its state to disk, so the task was lost, but scheduler thinks that task is waiting to be picked up workers
- terminationGracePeriodSeconds on your worker PODs is too low or it is not there at all.(default is 60 seconds)
This message worker: Warm shutdown means that celery received SIGTERM signal and it started gracefull shutdown - it is not going to pick any more tasks from redis queue and it will wait for all of the running tasks to finish
But if you've got some tasks that may be running for longer than terminationGracePeriod then Kubernetes might send SIGKILL first and:
- Celery will not be able to wait for all of the running tasks to finish(those will end with failed status)
- it was able to pick the task from queue(before SIGTERM), but not able to change its state to running(After SIGKILL) - maybe your case
Also some of the celery workers might receive SIGKILL signal, when there is not enough memory allocated and it may led to the same behaviour, unfortunately you may not see OOM events in the kubernetes cluster when it happens, becaue when there is more than one process running on the container in Kubernetes then it is chosing randomly one of the child processes within container and sends SIGKILL(Celery is running with Main process and child processes(workers)).
If the Main process receives SIGKILL you will probably see OOM event(container will restart/stop), but if child then tasks it was processing will fail(in the logs you will be able to see that it received SIGKILL singal) or stuck in queued state if it was able to pick it and container will be running fine, - one of the celery processes will be restarted
Hey @arkadiusz-bach,
I am sorry, that's true that I haven't posted any logs, maybe I haven't been clear as well...
@V0lantis task might stuck in the queued state at least due to:
- Redis crashed after receiving the task and it had no time / was not configured to save its state to disk, so the task was lost, but scheduler thinks that task is waiting to be picked up workers
We use ElasticCache service. The service is configured to store data on disk and is oversized for the actual amount of tasks it received and as stated above, celery actually acknowledged when tasks are successfully sent to queue; otherwise, it fails.
- terminationGracePeriodSeconds on your worker PODs is too low or it is not there at all.(default is 60 seconds)
The warm shutdown was received 10 minutes before the pod was actually killed. But the task was sent and acknowledged by CeleryExecutor with:
Setting external_id for <TaskInstance: example_bash_operator.this_will_skip scheduled__2022-08-28T00:00:00+00:00 [queued]> to 979a100a-5012-4135-b01f-b93577b39ede
in the scheduler_log, which actually means that the task was received by redis (or celery would through a connection error).
This message
worker: Warm shutdownmeans that celery received SIGTERM signal and it started gracefull shutdown - it is not going to pick any more tasks from redis queue and it will wait for all of the running tasks to finish
Yep, but as I said in my comment :
this log message (
worker: Warm shutdown) appeared a couple of milliseconds AFTER the scheduler has sent the task to redis.
⬆️ I still think that Celery consumer exited during the handling of my message here and didn't have time to put it in the unacked hash, which stores unacknowledged tasks.
But if you've got some tasks that may be running for longer than terminationGracePeriod then Kubernetes might send SIGKILL first and:
- Celery will not be able to wait for all of the running tasks to finish(those will end with failed status)
- it was able to pick the task from queue(before SIGTERM), but not able to change its state to running(After SIGKILL) - maybe your case
⬆️ I stil think that my pods received SIGKILL 10 minutes after the SIGTERM but I haven't been able yet to check the terminationGracePeriod
Also some of the celery workers might receive SIGKILL signal, when there is not enough memory allocated and it may led to the same behaviour, unfortunately you may not see OOM events in the kubernetes cluster when it happens, becaue when there is more than one process running on the container in Kubernetes then it is chosing randomly one of the child processes within container and sends SIGKILL(Celery is running with Main process and child processes(workers)).
Already checked as well. We have a Grafana dash to monitor the hardware resources that the pod is consuming, and the workers were far below the threshold we gave them
If the Main process receives SIGKILL you will probably see OOM event(container will restart/stop), but if child then tasks it was processing will fail(in the logs you will be able to see that it received SIGKILL singal) or stuck in queued state if it was able to pick it and container will be running fine, - one of the celery processes will be restarted
The thing is, the task never started. Thanks anyway for your input, it was really enriching :)
I have been trying to fix this issue in celery/kombu where I beleive(d) was lying the issue we are talking about here (namely, that a SIGTERM was sent while the redis queue was consumed, leading to a message never being acknowledged and a message lost in the wild).
The issue is, I have been trying to replicate the issue in kombu without any success. I managed to reproduce the issue directly in Airflow by RPOP a msg before calling celery worker and it worked perfectly as described at the beginning of this issue. But, at least in its last version (Celery v.5.2.7) which is the one we have in production, I can't reproduce the behavior (Sending a SIGTERM just after kombu has BRPOP the msg). The msg is rightfully acked, and rightfully put back in the queue after SIGTERM had been catched by python.
I don't really know where to go from here now.
I am putting the logs msg we have been having in our prod environment. Maybe someone else will have a better idea :
Scheduler logs
<TaskInstance: dag_id.task_id scheduled__2022-08-24T15:00:00+00:00 [scheduled]>
<TaskInstance: dag_id.task_id scheduled__2022-08-24T15:00:00+00:00 [scheduled]>
[2022-08-24 16:23:50,060] {cheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='dag_id', task_id='task_id', run_id='scheduled__2022-08-24T15:00:00+00:00', try_number=1, map_index=-1) to executor with priority 5 and queue default
[2022-08-24 16:23:50,060] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'dag_id', 'task_id', 'scheduled__2022-08-24T15:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/dag_id.py']
[2022-08-24 16:23:50,264] {cheduler_job.py:605} INFO - Executor reports execution of dag_id.task_id run_id=scheduled__2022-08-24T15:00:00+00:00 exited with status queued for try_number 1
[2022-08-24 16:23:50,276] {cheduler_job.py:632} INFO - Setting external_id for <TaskInstance: dag_id.task_id scheduled__2022-08-24T15:00:00+00:00 [queued]> to 139960c0-263d-4891-b1b0-712b077a0d2b
<TaskInstance: dag_id.task_id scheduled__2022-08-24T15:00:00+00:00 [scheduled]>
<TaskInstance: dag_id.task_id scheduled__2022-08-24T15:00:00+00:00 [scheduled]>
There were two workers which were downscaled with similar logs:
1rst worker
[2022-08-24 16:23:49,597: WARNING/ForkPoolWorker-254] unknown id type, added to user and store ids columns: create_account_step.fingerprint
worker: Warm shutdown (MainProcess)
[2022-08-24 16:23:49,777: WARNING/ForkPoolWorker-254] unknown id type, added to user and store ids columns: login.fingerprint
The log above is not relevant. The relevant information here is that the worker received shutdown SIGTERM signal around 0.3 seconds before the task id was actually sent.
I think a really good way to start wil be to upgrade Airflow to 2.4.0 once it is out (even today possibly?) and install it with the right constraints (so that the right celery /kombu packages get installed). That sounds like it could have been handled already in the meantime by some of the recent Airflow fixes or - more likely - recent celery/kombu updates.
I think if this is confirmed for 2.4.0, we will have in general more capacity to look more closely to such issues and see if this can be reproduced and fixed.
We will try this @potiuk, thank you !
It's been released BTW :)
We got similar issue (Airflow 2.3.3), it happened right after worker suffered connection loss to Redis who later did not consume any tasks for 3 days, despite showing up as healthy in Flower. One thing i suspect is that reconnect attempt got stuck in an infinite timeout. Python sockets have default no timeout, same default applies in Celery and redis-py. (This would not explain why flower showed node as healthy though)
Discussed in https://github.com/redis/redis-py/issues/722 and https://github.com/redis/redis-py/issues/2243
In airflow, the celery_broker_transport_options option can be used to set the options discussed there.
Hi we have a similar issue on Airflow 2.4.0. The issue happens after I set the pool too high, which leads to locking issue on Databases (the exact error was "Lock wait timeout exceeded; try restarting transaction"). The task was unable to mark failed for a while and after they did, the pool was kinda corrupted, which leads to the task being stuck in the queue (the last pool). The other pool is running fine, task is still running and scheduled as normal
I do not have any direction to troubleshoot this issue at the moment, but if someone could give me pointers on where to check, I am glad to help troubleshoot the issue
