airflow
airflow copied to clipboard
Change signature of `cleanup_stuck_queued_tasks` to return TIs or TI Keys
Body
This has gotten to an awkward place where we are checking that repr(ti) is in the list of "readable versions of ti"
There's no guarantee that the executor will use repr and, just why not use the TI object.
Here's the relevant bit of code https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job_runner.py#L1572-L1580
Reproduced here for convenience:
cleaned_up_task_instances = set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
for ti in stuck_tis:
if repr(ti) in cleaned_up_task_instances:
self._task_context_logger.warning(
"Marking task instance %s stuck in queued as failed. "
"If the task instance has available retries, it will be retried.",
ti,
ti=ti,
)
Should be something like this instead:
for ti in executor.cleanup_stuck_queued_tasks(tis=stuck_tis):
self._task_context_logger.warning(
"Marking task instance %s stuck in queued as failed. "
"If the task instance has available retries, it will be retried.",
ti,
ti=ti,
)
But it will take some effort to figure out how to evolve the executor interface (deprecation warning, updating providers, etc)
I suppose this is a cost of having executor be a public interface.
just fyisies @o-nikolas @potiuk @pankajastro @sunank200 @vincbeck
Committer
- [X] I acknowledge that I am a maintainer/committer of the Apache Airflow project.