airflow
airflow copied to clipboard
MappedTasks don't short-circuit
Apache Airflow version
2.10.3
If "Other Airflow 2 version" selected, which one?
No response
What happened?
It seems like tasks created via .expand() do not inherit the downstream tasks of its parent/original. This becomes an issue when we expand a task.short_circuit because short-circuiting no longer works.
What you think should happen instead?
My understanding from the documentation is, that step_two.expand(do=step_one.expand(i=start())) is supposed to be equivalent to:
flowchart LR
start --> A1[step_one_0] --> B1[step_two_0]
start --> A2[step_one_1] --> B2[step_two_1]
start --> A3[step_one_2] --> B3[step_two_2]
start --> A4[…] --> B4[…]
start --> A9[step_one_9] --> B9[step_two_9]
How to reproduce
import random
from typing import List
import pendulum
from airflow.decorators import dag, task
@dag(
"test_foo",
schedule=None,
start_date=pendulum.now(),
render_template_as_native_obj=True,
dag_display_name="Test random things",
)
def test_foo():
@task.python
def start() -> List[int]:
return [i for i in range(10)]
@task.short_circuit
def step_one(i: int) -> bool:
print(f"Hello from step {i}")
return random.random() >= 0.5
@task.python
def step_two(do: bool):
if not do:
print("Should've been skipped.")
print("Doing stuff")
step_two.expand(do=step_one.expand(i=start()))
test_foo()
if __name__ == "__main__":
test_foo().test()
Obviously approximately 5 of the 10 tasks are bound to return False and short-circuit. If they do the log will read something like this:
Hello from step 5
[2024-11-11 13:31:09,695] {python.py:240} INFO - Done. Returned value was: False
[2024-11-11 13:31:09,697] {python.py:309} INFO - Condition result is False
[2024-11-11 13:31:09,698] {python.py:316} INFO - No downstream tasks; nothing to do.
But checking the task_dict variable in the module, we see that step_one does have step_two set as downstream task and vice-versa, step_two has step_one set as upstream task.
__pydevd_ret_val_dict['factory'].task_dict['step_one'].downstream_task_ids
{'step_two'}
__pydevd_ret_val_dict['factory'].task_dict['step_two'].upstream_task_ids
{'step_one'}
Operating System
24.04.1 LTS (Noble Numbat)
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
I'm testing this on Standalone
Anything else?
I can get closer to the desired behaviour by using a list comprehension in the dependency, but that can't be done dynamically, I can't iterate PlainXComArgs and it also doesn't produce exactly the desired behaviour.
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
I was able to get the desired behaviour by creating a task group, pulling the return_value XCom from start and manually creating all the tasks in a loop, which I feel shouldn't be necessary:
from random import random
from typing import List
from airflow.decorators import dag, task, task_group
from airflow.models import XCom
from pendulum import now
@dag(
"test_foo",
schedule=None,
start_date=now(),
dag_display_name="Test random things",
)
def test_foo():
@task.python
def start() -> List[int]:
return [i for i in range(1, 11)]
@task_group()
def tg():
@task.short_circuit
def step_one(i: int) -> bool:
print(f"Hello from step {i}")
return random() >= 0.5
@task.python(trigger_rule="one_done")
def step_two(do: bool):
if not do:
print("Should've been skipped.")
print("Doing stuff")
for i in XCom.get_one(key="return_value", task_id="start", run_id=XCom.run_id):
step_two(step_one(i))
start() >> tg()
test_foo()
if __name__ == "__main__":
test_foo().test()
I'm seeing this on 2.9.3, too.
@eladkal we have upgraded from airflow 2.9.3 to 2.10.3, we have similar issue. where we dont see logs on any task. We getting error. "No task logs found. Try the Event Log tab for more context."
May I know is this resolved?
Hey @Chais, I've managed to reproduce the issue using the code snippets that you provided.
~The root cause of this issue is that short circuit isn't currently suitable for working with task-generated mapping - when the short circuit operator runs in that case, downstream tasks have not been expanded yet - so it just returns No downstream tasks; nothing to do. and moves on as if nothing happend:~
https://github.com/apache/airflow/blob/b5f033a933d6bba2433a50a62b389b6546123ac4/airflow/operators/python.py#L315
~In the second example that you provided, downstream are known before hand - so it works normally.~
I'll try to work on a fix.
Edit: See my response below.
Shouldn’t downstream_task_ids still contain the unexpanded base task in this case? Something still seems off to me. And we should probably fix the operator in any case.
Shouldn’t
downstream_task_idsstill contain the unexpanded base task in this case? Something still seems off to me. And we should probably fix the operator in any case.
After spending some more time, I've figured that you should be correct, and the problem seems to be actually here (see TODO comment just above it): https://github.com/apache/airflow/blob/caa90a18d0790fb51c919485b749047d96c315c0/airflow/models/taskinstance.py#L1331
Until this point, task contains the downstream_task_ids - but due to the early return of mapped operators, it's not recorded in the downstream tasks.
The hacky solution seems to be pushing the downstream_task_ids for into a hidden xcom, so the mapped short-circuit will be able it to utilize it - but I'm also open for suggestions :)
Why does not recording TaskMap affect this? The function only does one thing—pushing a row to the TaskMap table. This table is only used for task mapping.
Why does not recording TaskMap affect this? The function only does one thing—pushing a row to the TaskMap table. This table is only used for task mapping.
You're correct, I think that now I understand better what's going on :) I've managed to come up with a solution - feel free to review the PRs when you have the time.
I've resolved the issue for version 2.X, so hopefully it will be available starting v2.10.5/v2.11.0 (whatever comes first).
I leave this issue open as there are some arrangements to do before merging the corresponding PR to main (v3+).