airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Fix pre-mature evaluation of tasks in mapped task group

Open avkirilishin opened this issue 2 years ago • 4 comments

closes: https://github.com/apache/airflow/issues/34023

Redo of the #34337 Relates to https://github.com/apache/airflow/issues/35541#issuecomment-1826259201

@uranusjr Can you please take a look at the approach? I haven't checked the performance yet, but if the approach is okay, I will check it if needed. Perhaps it can be optimized.

And I found that something goes wrong with the example from the get_relevant_upstream_map_indexes method. When I checked the dag:

import pendulum

from airflow import DAG
from airflow.decorators import task, task_group


with DAG(
    'mul-in-tg',
    schedule='@daily',
    start_date=pendulum.DateTime(2023, 12, 26),
) as dag:
    @task
    def upstream(inp):
        return inp

    @task
    def this_task(v):  # This is self.task.
        return v * 2

    @task_group
    def tg1(inp):
        val = upstream(inp)  # This is the upstream task.
        this_task(val)  # When inp is 1, val here should resolve to 2.
        return val

    val = tg1.expand(inp=[1, 2, 3])  # This val is the same object returned by tg1.

    @task
    def another_task(inp, val):
        print("(inp, val)", (inp, val))

    @task_group
    def tg2(inp):
        another_task(inp, val)  # val here should resolve to [2, 4, 6].

    tg2.expand(inp=["a", "b"])

I observed: image Is this the expected behavior?

avkirilishin avatar Dec 28 '23 01:12 avkirilishin

cc: @uranusjr - following discussion in #35541 - do you think that one should fix the problem ?

potiuk avatar Jan 25 '24 17:01 potiuk

@avkirilishin can you rebase and resolve conflics?

eladkal avatar Mar 16 '24 13:03 eladkal

Can you add this test:

def test_mapped_tasks_in_mapped_task_group_waits_for_upstreams_to_complete(dag_maker, session):
    """Test that one failed trigger rule works well in mapped task group"""
    with dag_maker() as dag:

        @dag.task
        def t1():
            return [1, 2, 3]

        @task_group("tg1")
        def tg1(a):
            @dag.task()
            def t2(a):
                return a

            @dag.task(trigger_rule=TriggerRule.ONE_FAILED)
            def t3(a):
                return a

            t2(a) >> t3(a)

        t = t1()
        tg1.expand(a=t)

    dr = dag_maker.create_dagrun()
    ti = dr.get_task_instance(task_id="t1")
    ti.run()
    dr.task_instance_scheduling_decisions()
    ti3 = dr.get_task_instance(task_id="tg1.t3")
    assert not ti3.state

ephraimbuddy avatar Mar 22 '24 17:03 ephraimbuddy

@avkirilishin are you still working on this PR?

eladkal avatar Apr 26 '24 12:04 eladkal

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Aug 31 '24 06:08 github-actions[bot]