Sensors stuck in queued after being spot interrupted
Apache Airflow Provider(s)
cncf-kubernetes
Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes==8.3.2 apache-airflow-providers-common-io==1.3.2 apache-airflow-providers-http==4.12.0 apache-airflow-providers-postgres==5.11.2
Apache Airflow version
2.9.2
Operating System
Debian GNU/Linux 12 (bookworm)
Deployment
Other Docker-based deployment
Deployment details
We deploy Docker containers based on the official Airflow images (apache/airflow:slim-2.9.2-python3.11) to an AWS EKS kubernetes cluster running Kubernetes 1.29.
What happened
When a sensor is stopped due to a spot interrupt, it can end up stuck in the queued state, never getting into the `running state. In the scheduler logs, this information is reported:
~(12 other attempts omitted)~
[2024-07-01 08:49:47,283] {base_executor.py:284} INFO - queued but still running; attempt=13 task=TaskInstanceKey(dag_id='test', task_id='python-sensor-1', run_id='scheduled__2024-06-30T00:00:00+00:00', try_number=1, map_index=-1)
[2024-07-01 08:49:48,316] {base_executor.py:287} ERROR - could not queue task TaskInstanceKey(dag_id='test', task_id='python-sensor-1', run_id='scheduled__2024-06-30T00:00:00+00:00', try_number=1, map_index=-1) (still running after 13 attempts)
Restarting the scheduler allows the sensors to recover and get into the running state again.
What you think should happen instead
Sensors should be able to recover from a spot interrupt without needing to restart the scheduler.
I have tested multiple versions of the apache-airflow-providers-cncf-kubernetes provider, and this used to work correctly up until version 8.0.1. Starting from version 8.1.0, this has been broken (I've tested this on 8.1.0, 8.2.0, 8.3.1 and 8.3.2). This seems to be a regression.
How to reproduce
I've used the following definition to test this behavior. This will just start a number of Python sensors which will never succeed and keep running until timeout.
from airflow import DAG
from airflow.sensors.python import PythonSensor
with DAG(
"test",
schedule_interval="@daily",
max_active_runs=1,
) as dag:
for i in range(10):
PythonSensor(
task_id=f"python-sensor-{i}",
mode="reschedule",
timeout=timedelta(hours=6),
poke_interval=timedelta(seconds=60),
python_callable=lambda: time.sleep(1200)
)
To trigger the behavior, one needs to spot interrupt the node where a sensor is running, but not the one where the scheduler is running. Restarting the scheduler allows the task to recover, so we want to only spot interrupt the task, while the scheduler stays healthy. You can look for a node matching these two constraints using k9s or kubectl, and then use the AWS console to initiate an interruption on the spot request for that node (can be done from the EC2 service).
Anything else
By following the reproduction steps as described, I can trigger this issue every time.
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
What's the setting for task_queued_timeout in this environment?
It's at the default value of 600 seconds, but the sensors don't get set to failed after that window either. (Unless I misunderstand the unit of the setting, and it's actually 10 hours instead of 10 minutes?)
This might be an issue with using the reschedule mode with a poke_interval of 0 seconds. Can this be reproduced with a poke_interval of 60 seconds or more? From the BaseSensor's docstring:
The poke interval should be more than one minute to prevent too much load on the scheduler.
Thanks for wanting to think along on this :)
We're not actually running this sample in production, it was constructed to reliably reproduce the issue. By setting the poke_interval to 0 seconds, you can be quite sure that the spot interrupt will affect the sensor while it's actively running.
That being said, I have updated the sample to match the BaseSensor recommendation, and ran it again on our environment. Unfortunately, the behavior is unchanged, the sensors still get stuck in queued...
I would be surprised if this would be related to the settings however, if I run the same example DAG with version 8.0.1 of the provider (with the same settings), everything works as expected.
I was able to solve this problem by removing the piece of incorrect code as outlined by #41436. When this piece of code is removed, the tasks are processed correctly again.
I am not entirely sure why this is the case, as this piece of code was present on provider version 8.0.1 already. My guess is that the case where the incorrect logic fails was introduced in 8.1.0, thereby uncovering the bug that was already present.
What is the best way of getting the fixes proposed in #41436 merged into the kubernetes provider?
@RNHTTR I would like to contribute a PR to fix this issue.
Do you know if there is a preference from the Airflow Committers for either of the solutions in #41436 (so either removing the piece of problematic code, or adding finalizers to ensure all deletes are properly processed)? If not, do you know who of the other committers could advise on this, or should I just give it a go? Thanks!
Cool, I assigned you to the Issue.
I'm not sure removing this code is the right approach, as #30872 appears to have fixed a bug. Maybe @jedcunningham can weigh in here regarding the recommended approach
Let's see. So #28871 attempted to fix the the missing "TaskInstance Finished" log line, and #30872 was a fix of that fix. With that if block from #30872 gone, I'd expect you'll see multiple of those "TaskInstance Finished" log lines.
Feels like a race condition between Airflow detecting the "Succeeded" -> deleting it itself and the pod being deleted elsewhere. I'm a little surprised the watcher doesn't see the initial "Succeeded" too though?
Maybe the right answer is to, instead, only emit to the event_buffer if the task is in self.running. As in, don't try to protect against the duplicate events with the if block in process_status, but instead in _change_state using the running list. @vlieven, can you take a stab at attempting that fix?
Thanks for the suggestion @jedcunningham! It seems that the _change_state function was just missing an early return.
I've opened a PR at #42624 that adds it.