airflow
airflow copied to clipboard
execution_timeout not enforced, task hangs up
Apache Airflow version
Other Airflow 2 version (please specify below)
What happened
Version: 2.5.1 Run env: MWAA on AWS
Summary: Once every ~500-1000 runs approximately, the task hangs up infinitely until manually killed, not allowing any other task to be placed for this dag; and so its execution_timeout
is not enforced.
In my experience, it only happens on tasks that consume from Kafka using library confluent_kafka
. The execution_timeout
is enforced in other tasks.
Dag definition code:
# Dag Info
default_args = {
"retries": 3,
"on_failure_callback": on_failure_callback,
"sla": timedelta(hours=2),
"execution_timeout": timedelta(hours=4),
}
@dag(SERVICE_NAME,
default_args=default_args,
schedule_interval="*/5 * * * *",
start_date=pendulum.datetime(2023, 7, 3, 9, tz="UTC"),
catchup=True,
tags=['critical', 'dumper', 'kafka'],
max_active_runs=1)
def process_records():
ingest_from_kafka_and_save()
The ingest_from_kafka_and_save()
contains code that consumes from Kafka, providing a callback function to the consumption (which I suspect may have something to do with the problem, since it happens asynchronously).
It's hard to reproduce since it is temperamental and happens every once in a while. Audit Log does not show anything special - just seems the hang indefinitely. Consumption code itself works fine otherwise and it has been running for months in this and other dags that use it - but they also show the same behaviour.
What you think should happen instead
The execution_timeout
should be enforced and the task should be killed so a new one could be placed.
How to reproduce
It is hard to reproduce, since it happens very unfrequently.
- Create a dag with the definition in the "What happened" section
- Add a function with a basic kafka consumption from a Kafka topic that consumes until end of topic partitions (or a max number of messages)
- Leave it running and wait for the problem to happen
Operating System
MWAA on AWS
Versions of Apache Airflow Providers
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.10.txt" apache-airflow-providers-amazon apache-airflow-providers-snowflake==4.0.2 apache-airflow-providers-mysql==4.0.0 apache-airflow-providers-slack confluent-kafka==2.1.0
Deployment
Amazon (AWS) MWAA
Deployment details
Medium sized cluster 2.5.1 version, latest update applied 2 weeks ago.
Anything else
Unclear what triggers the error - but whatever the error, the task should be killed to enforce the execution_timeout
.
Seems like an internal thread management issue.
Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
Hi @xmariachi - as debugging of your reported problem is hard and can be caused by multiple circumstances also outside of control of Airflow.... can you check when the node/POD hangs to exec into and produce a Python Stack-Trace from the running instance for diagnosis? You could for example follow the description in https://stackoverflow.com/questions/6849138/check-what-a-running-process-is-doing-print-stack-trace-of-an-uninstrumented-py
Thanks @jscheffl , will do. However, this is AWS MWAA - is that feasible to do there as well?
In general we accept bugs which could be reproduce in Open Source implementation on latest stable version of Airflow, for that purpose you could try to use provided Docker Compose from Running Airflow in Docker
There are several reasons for this
- Community lack of knowledge of deployments Managed services
- What kind of changes made by owners of Managed Airflow
- And there is at least 7 different companies provide Airflow as a Service. Ecosystem: Airflow as a Service
A bit more information about execution_timeout
itself, it is implemented by use SIGALARM signal, and behaviour of this signal might not work in some cases,
This block of code could demonstrate it demonstrate
import signal
import time
class TimeoutError(Exception):
...
def some_func():
try:
while True:
time.sleep(0.1)
except Exception:
pass
print("Nope")
def some_another_func():
while True:
time.sleep(0.1)
print("Nope")
def handler(signum, frame):
raise TimeoutError(f"Timeout!")
TIMEOUT = 2
### This one not failed, because error will caught in try..except, and instead of TimeoutError ``Nope`` would print
signal.signal(signal.SIGALRM, handler)
signal.alarm(TIMEOUT)
try:
some_func()
finally:
signal.alarm(0)
### This one failed, and TimeoutError will raise
signal.signal(signal.SIGALRM, handler)
signal.alarm(TIMEOUT)
try:
some_another_func()
finally:
signal.alarm(0)
This is ilikely something we cannot do anything about you need to raise it to Kafka developers. The problem is that if you have C library that hangs and does not periodically check for signals we cannot do much.
https://python.readthedocs.io/en/stable/library/signal.html
A long-running calculation implemented purely in C (such as regular expression matching on a large body of text) may run uninterrupted for an arbitrary amount of time, regardless of any signals received. The Python signal handlers will be called when the calculation finishes.
Possibly we could think about additional escalation level and sending sigkill in such cases after additional timeout. WDYT @Taragolis ? I think that should be possible as we are forking in local task job and we have a hearbeating process that is pure python that could potentiall send SIGKILL to the actual "task" process.
WDYT?
I think yes we need an additional escalation level for execution timeout.
The problem of current implementation that we raise an error in handler function and have no idea in which place in Main thread it actually raised, as result user code, third-party libraries and airflow components itself might prevent exit from the task.
In general in most cases it works fine, even according to description of the issue "Once every ~500-1000 runs approximately, the task hangs up infinitely until manually killed"
Just for demonstrate current flaky behaviour with a bit greater probability to survive after TimeoutError
import signal
import time
class TimeoutError(Exception):
...
def flacky_behaviour():
ix = 0
delay = 0.005
each_second = 1 / delay
while True:
# Handle all regular exceptions here
try:
# SIGALRM Handler raise error here? Then code survive
# change TimeoutError base class to BaseException might help in this case
time.sleep(delay)
except Exception:
print("Nope")
# Handle all exceptions include ``SystemExit`` (sys.exit) and ``KeyboardInterrupt`` (SIGINT)
try:
# SIGALRM Handler raise error here?
# Well... better luck next time
time.sleep(delay)
except:
print("Serious NOPE!")
# If error happen outside of try block then execution would terminate
time.sleep(delay * 2)
if ix % each_second == 0:
print("It's alive!")
ix += 1
def handler(signum, frame):
print(f"Raise TimeoutError, MRO {TimeoutError.mro()}")
raise TimeoutError("Timeout!")
TIMEOUT = 2
signal.signal(signal.SIGALRM, handler)
signal.alarm(TIMEOUT)
try:
flacky_behaviour()
finally:
signal.alarm(0)
There is couple of different solution (an combination) in my head which might make things better (or worse). Maybe better move this discussion into Dev List for wider group of people
Option 1
https://github.com/apache/airflow/blob/de92a81f002e6c1b3e74ad9d074438b65acb87b6/airflow/exceptions.py#L81-L82
Inherit AirflowTaskTimeout
from BaseException
rather than AirflowException
, it should help in case of
try ... except Exception:
, It doesn't help in case if upstream code use try ... except:
but such code is a Worst Practice in python anyway.
Ideally we could think about changing inheritance of AirflowException from Exception
to BaseException
in Airflow 3, and make most of exceptions internal only and make only couple of then as part of public interface
Option 2
In additional to exception write TIMEOUT state into the DB backend, so upstream processes could kill hung process
In some circumstances would breaks silent_fail
of BaseSensorOperator and might be on_kill
methods
Option 3
Introduce new task state TIMEOUT and control execution timeout in Scheduler. Well this one require bigger effort to implements (JobRunners, new trigger rules) and think have it also have some side effects include the fact that some contributors/PMC would be against of new state since it add complexability to the already complex system
How about another option. I think we already use (depends on runner - could be also spawned and cgroups migh be involved - but generally it's the default) fork local task process execution - I believe when task is run, there is one main process (LocalTaskJob) that watches for the "child" process and regularly pings the DB with heartbeat (or so I understand it happens) - while the child process is doing the job. I think that parent processs does not actually parse the task to know the timeout (so that's a bit of a problem), but we could POTENTIALLY modify scheduler (that knows the timout from the serialized DAG) to pass such timeout to executor (and subsequently to the parent process) as additional parameter.
Then, assuming that this parent process is not getting into a long running C job and does not hang, it would be relatively easy to do task kill escalation - the usual SIGTERM, SIGHUP, SIGKILL dance with SIGKILL ultimately killing even most stubborn forked processes. The parent process is not doing much, it merely communicates with Airflow DB via heartbeats (as I understanda) and waits for the forked process to finish, so chances that this process will hang are slim.
That would be pretty robust solution, I think?
I guess this should be implemented in top of the current implementation? Correct me if I wrong.
- Try to raise exception AirflowTaskTimeout
- Heartbeat checker also check timeout, something simple (if we work in UTC) as
start_time + execution_timeout < current_time
. And if it happen give some additional grace period before terminate/kill
YEp. @Taragolis . That would be my idea.
It comes from the assumption that in order to REALLY be able to handle all timeouts you need to do it from a separate process - because as you rightfully explained - trying to handle things "in-process" is not always applicable. The idea of mine is to add extra layer of "what to do if the actual task process is not responding" - and I think utilising that parent process (which is already there) to apply such hard-timeout is simplest - without modifying states and adding yet another layer of monitoring processes/overloading the scheduler.
I think - other than occasional "whole machine stops working" this would handle most cases where the task is not timing out but still continues to do stuff because of badly written low-level C implemetnation of the library that is used..
And the "whole machine hangs" case should anyhow be handled on deployment level (for example K8S should kill it, also in this case we will stop receiving heartbeats and ultimately Scheduler should handle it even today.
Upon cursory examination I could see only one side effect or benefit depend on the case.
Right now task's email notifications and callbacks executed into the same process as task. So it could be the situation that task executed successfully and run on_success
callbacks which take some additional time, and on this moment start_time + execution_timeout < current_time
happen and we kill the process.
That is why I suggest also add grace period before kill, maybe even configurable
That is why I suggest also add grace period before kill, maybe even configurable
Oh absolutely. the signal "dance" I was mentioning should involves several grace periods. Usually it is implemented by
wait a little a) send HUP (wait) b) if not stopped send TERM (wait a little more) c) if it does not work send KILL (and do not wait - there is nothing to wait for - It happend few times in my life that process did not die after SIGKILL and that was at times when the whole OS/installation got heavily broken).
Seems we know about the nature and have a plan how to resolve it, so let me pick this issue then.
It happend few times in my life that process did not die after SIGKILL and that was at times when the whole OS/installation got heavily broken)
Thank you guys. I don't have enough knowledge on Airflow internals to chip in much, but your solution sounds sensible.
@xmariachi Hi, I also use mwaa and have come across this behaviour. What is the status of the task when it gets stuck? We had the task in a queued state, and noticed the sqs age in the metrics started to climb. I believe it's an issue with down scaling on mwaa, we added dag_timeout to our minutely dag to stop after 5 mins.
@xmariachi Hi, I also use mwaa and have come across this behaviour. What is the status of the task when it gets stuck? We had the task in a queued state, and noticed the sqs age in the metrics started to climb. I believe it's an issue with down scaling on mwaa, we added dag_timeout to our minutely dag to stop after 5 mins.
In my case, the task is active and remains as such.
@Taragolis was wondering if you have a patch/workaround for this issue?
@Taragolis was wondering if you have a patch/workaround for this issue?
Airflow 2.8.2 (RC likely tomorrow) - should have the fix applied from #35653 - if your problem @kurtqq was with swallowing exception. You are absolutely encouraged to subscribe to the devlist and when we send the email to vote on it to test if it solves your problem and report back.
If that was the case BTW (swallowing the Exception), then the workaround (or rather proper way of writing your DAG code or writing the library that caused it) was to remove the try: except Exception
that caused it (you will need to find it by inspecting your code and understanding what your operators and tasks that time out are doing).
If there is other reason (like badly written C-Code in your library that does not handle signals propagated from Python) - then the solution is to fix the library (or maybe upgrade to newer version that is fixed or raising an issue to whoever creates the library - but only you - knowing your DAGs and task that do not react to timeout properly do.
That woudl be about all
Do we have a strong case not too implement signal sequence in https://github.com/apache/airflow/issues/35474#issuecomment-1801988073 in supervising process? This seems to be generic enough and the only solution to deal with processes stuck in D-state.
If we just don't have it as priority, I can create a draft PR of that, but I don't want to start if there is a strong case of not implementing it. @potiuk
No. There is nothing against it.