Endless sensor rescheduling if the first sensor run failed to be saved in DB
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.10.3
What happened?
Endless sensor rescheduling happening in reschedule mode if the first sensor run failed to be saved in DB.
- The sensor was configured in reschedule mode:
@task.sensor(
task_id="sensor-s3-version",
poke_interval=5 * 60,
timeout=50 * 60,
mode="reschedule",
soft_fail=True,
)
def sensor_s3_version(connection_id: str, artefact: str) -> PokeReturnValue:
...
- First sensor run failed because of Postgre session terminated and as a result no entry was added to the
task_rescheduletable:
...
[2024-12-18, 11:41:11 UTC] {taskinstance.py:340} ▼ Post task execution logs
[2024-12-18, 11:41:11 UTC] {standard_task_runner.py:124} ERROR - Failed to execute job 71468 for task sensor-s3-version ((psycopg2.OperationalError) server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
...
- Because of no record in the
task_rescheduletable for thetry_number == 1condition the code here returns no data:
...
return session.scalar(
select(TaskReschedule)
.where(
TaskReschedule.dag_id == dag_id,
TaskReschedule.task_id == task_id,
TaskReschedule.run_id == run_id,
TaskReschedule.map_index == map_index,
TaskReschedule.try_number == try_number,
)
.order_by(TaskReschedule.id.asc())
.with_only_columns(TaskReschedule.start_date)
.limit(1)
)
- If no data received, the code here assigns the start_date to current system date and the cycle of rescheduling never ends:
...
if not start_date:
start_date = timezone.utcnow()
- As a result I have a task which lasts for 6 hours (with maximum 1 hour set) since the moment I've started debugging:
What you think should happen instead?
In that case we should try to find the first available task_reschedule record after the initially needed try number, it can be done easily by modifying condition from:
TaskReschedule.try_number == first_try_number
to:
TaskReschedule.try_number >= first_try_number
Thanks to sorting order_by(TaskReschedule.id.asc()) and limt limit(1) statements we would select the first record for a try next to initially needed.
If there are no records at all, we would get timezone.utcnow() by already implemented logic
How to reproduce
Fail the first try of a sensor and delete the records in the task_reschedule table for the first try group
Operating System
Airflow official docker apache/airflow:2.9.3-python3.8: Debian GNU/Linux 12 (bookworm)
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
I think your proposal is meaningful - whereas I am wondering what problems with Postgres are leading to this in-consistency. I think this should be addressed first. (1) You should check how to make your DB stable. If there are connection losses in other places I think a lot of more errors can happen. Airflow expects a stable DB. (2) I am wondering why in this cause you report an inconsistent result is left in the DB - I would have expected that all steps are executed in one transaction. Means either not any updates or a consistent update.
@jscheffl Thank you for looking into it.
- I have stabilised the DB already, the reason was the lack of resources quoted to the Postgre pods. However the problem is still important because session can be closed on many reasons, it is the DB backend logics and we should not go into infinite loop in any case
- I do not see where I have said "inconsistence" but anyway - the counter of reschedules incremented, but the reschedule was not written in DB - here I see a doubtful (discussable) situation
What would you suggest - can I implement the proposed logics as PR? Thank you for your answer!
What would you suggest - can I implement the proposed logics as PR?
Always. There is absolutely nothing wrong with opening a PR, we might close and reject it after we look at the code - which is the best way to see if your proposal is good, far easier and faster for maintainers to understand the scope of the issue and solution
Hi @jscheffl ! Can you please have a look on my PR?