airflow icon indicating copy to clipboard operation
airflow copied to clipboard

MappedTasks don't short-circuit

Open Chais opened this issue 1 year ago • 9 comments
trafficstars

Apache Airflow version

2.10.3

If "Other Airflow 2 version" selected, which one?

No response

What happened?

It seems like tasks created via .expand() do not inherit the downstream tasks of its parent/original. This becomes an issue when we expand a task.short_circuit because short-circuiting no longer works.

What you think should happen instead?

My understanding from the documentation is, that step_two.expand(do=step_one.expand(i=start())) is supposed to be equivalent to:

flowchart LR
start --> A1[step_one_0] --> B1[step_two_0]
start --> A2[step_one_1] --> B2[step_two_1]
start --> A3[step_one_2] --> B3[step_two_2]
start --> A4[…] --> B4[…]
start --> A9[step_one_9] --> B9[step_two_9]

How to reproduce

import random
from typing import List

import pendulum
from airflow.decorators import dag, task


@dag(
    "test_foo",
    schedule=None,
    start_date=pendulum.now(),
    render_template_as_native_obj=True,
    dag_display_name="Test random things",
)
def test_foo():
    @task.python
    def start() -> List[int]:
        return [i for i in range(10)]

    @task.short_circuit
    def step_one(i: int) -> bool:
        print(f"Hello from step {i}")
        return random.random() >= 0.5

    @task.python
    def step_two(do: bool):
        if not do:
            print("Should've been skipped.")
        print("Doing stuff")

    step_two.expand(do=step_one.expand(i=start()))


test_foo()

if __name__ == "__main__":
    test_foo().test()

Obviously approximately 5 of the 10 tasks are bound to return False and short-circuit. If they do the log will read something like this:

Hello from step 5
[2024-11-11 13:31:09,695] {python.py:240} INFO - Done. Returned value was: False
[2024-11-11 13:31:09,697] {python.py:309} INFO - Condition result is False
[2024-11-11 13:31:09,698] {python.py:316} INFO - No downstream tasks; nothing to do.

But checking the task_dict variable in the module, we see that step_one does have step_two set as downstream task and vice-versa, step_two has step_one set as upstream task.

__pydevd_ret_val_dict['factory'].task_dict['step_one'].downstream_task_ids
{'step_two'}
__pydevd_ret_val_dict['factory'].task_dict['step_two'].upstream_task_ids
{'step_one'}

Operating System

24.04.1 LTS (Noble Numbat)

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

I'm testing this on Standalone

Anything else?

I can get closer to the desired behaviour by using a list comprehension in the dependency, but that can't be done dynamically, I can't iterate PlainXComArgs and it also doesn't produce exactly the desired behaviour.

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

Chais avatar Nov 11 '24 14:11 Chais

I was able to get the desired behaviour by creating a task group, pulling the return_value XCom from start and manually creating all the tasks in a loop, which I feel shouldn't be necessary:

from random import random
from typing import List

from airflow.decorators import dag, task, task_group
from airflow.models import XCom
from pendulum import now


@dag(
    "test_foo",
    schedule=None,
    start_date=now(),
    dag_display_name="Test random things",
)
def test_foo():
    @task.python
    def start() -> List[int]:
        return [i for i in range(1, 11)]

    @task_group()
    def tg():
        @task.short_circuit
        def step_one(i: int) -> bool:
            print(f"Hello from step {i}")
            return random() >= 0.5

        @task.python(trigger_rule="one_done")
        def step_two(do: bool):
            if not do:
                print("Should've been skipped.")
            print("Doing stuff")

        for i in XCom.get_one(key="return_value", task_id="start", run_id=XCom.run_id):
            step_two(step_one(i))

    start() >> tg()


test_foo()

if __name__ == "__main__":
    test_foo().test()

Chais avatar Nov 12 '24 12:11 Chais

I'm seeing this on 2.9.3, too.

Chais avatar Nov 20 '24 11:11 Chais

@eladkal we have upgraded from airflow 2.9.3 to 2.10.3, we have similar issue. where we dont see logs on any task. We getting error. "No task logs found. Try the Event Log tab for more context."

May I know is this resolved?

Paniraj2010 avatar Dec 04 '24 09:12 Paniraj2010

Hey @Chais, I've managed to reproduce the issue using the code snippets that you provided. ~The root cause of this issue is that short circuit isn't currently suitable for working with task-generated mapping - when the short circuit operator runs in that case, downstream tasks have not been expanded yet - so it just returns No downstream tasks; nothing to do. and moves on as if nothing happend:~ https://github.com/apache/airflow/blob/b5f033a933d6bba2433a50a62b389b6546123ac4/airflow/operators/python.py#L315 ~In the second example that you provided, downstream are known before hand - so it works normally.~ I'll try to work on a fix.

Edit: See my response below.

shahar1 avatar Dec 07 '24 21:12 shahar1

Shouldn’t downstream_task_ids still contain the unexpanded base task in this case? Something still seems off to me. And we should probably fix the operator in any case.

uranusjr avatar Dec 09 '24 06:12 uranusjr

Shouldn’t downstream_task_ids still contain the unexpanded base task in this case? Something still seems off to me. And we should probably fix the operator in any case.

After spending some more time, I've figured that you should be correct, and the problem seems to be actually here (see TODO comment just above it): https://github.com/apache/airflow/blob/caa90a18d0790fb51c919485b749047d96c315c0/airflow/models/taskinstance.py#L1331

Until this point, task contains the downstream_task_ids - but due to the early return of mapped operators, it's not recorded in the downstream tasks.

The hacky solution seems to be pushing the downstream_task_ids for into a hidden xcom, so the mapped short-circuit will be able it to utilize it - but I'm also open for suggestions :)

shahar1 avatar Dec 12 '24 22:12 shahar1

Why does not recording TaskMap affect this? The function only does one thing—pushing a row to the TaskMap table. This table is only used for task mapping.

uranusjr avatar Dec 13 '24 05:12 uranusjr

Why does not recording TaskMap affect this? The function only does one thing—pushing a row to the TaskMap table. This table is only used for task mapping.

You're correct, I think that now I understand better what's going on :) I've managed to come up with a solution - feel free to review the PRs when you have the time.

shahar1 avatar Dec 13 '24 23:12 shahar1

I've resolved the issue for version 2.X, so hopefully it will be available starting v2.10.5/v2.11.0 (whatever comes first). I leave this issue open as there are some arrangements to do before merging the corresponding PR to main (v3+).

shahar1 avatar Dec 18 '24 06:12 shahar1