Fix pre-mature evaluation of tasks in mapped task group
closes: https://github.com/apache/airflow/issues/34023
Redo of the #34337 Relates to https://github.com/apache/airflow/issues/35541#issuecomment-1826259201
@uranusjr Can you please take a look at the approach? I haven't checked the performance yet, but if the approach is okay, I will check it if needed. Perhaps it can be optimized.
And I found that something goes wrong with the example from the get_relevant_upstream_map_indexes method. When I checked the dag:
import pendulum
from airflow import DAG
from airflow.decorators import task, task_group
with DAG(
'mul-in-tg',
schedule='@daily',
start_date=pendulum.DateTime(2023, 12, 26),
) as dag:
@task
def upstream(inp):
return inp
@task
def this_task(v): # This is self.task.
return v * 2
@task_group
def tg1(inp):
val = upstream(inp) # This is the upstream task.
this_task(val) # When inp is 1, val here should resolve to 2.
return val
val = tg1.expand(inp=[1, 2, 3]) # This val is the same object returned by tg1.
@task
def another_task(inp, val):
print("(inp, val)", (inp, val))
@task_group
def tg2(inp):
another_task(inp, val) # val here should resolve to [2, 4, 6].
tg2.expand(inp=["a", "b"])
I observed:
Is this the expected behavior?
cc: @uranusjr - following discussion in #35541 - do you think that one should fix the problem ?
@avkirilishin can you rebase and resolve conflics?
Can you add this test:
def test_mapped_tasks_in_mapped_task_group_waits_for_upstreams_to_complete(dag_maker, session):
"""Test that one failed trigger rule works well in mapped task group"""
with dag_maker() as dag:
@dag.task
def t1():
return [1, 2, 3]
@task_group("tg1")
def tg1(a):
@dag.task()
def t2(a):
return a
@dag.task(trigger_rule=TriggerRule.ONE_FAILED)
def t3(a):
return a
t2(a) >> t3(a)
t = t1()
tg1.expand(a=t)
dr = dag_maker.create_dagrun()
ti = dr.get_task_instance(task_id="t1")
ti.run()
dr.task_instance_scheduling_decisions()
ti3 = dr.get_task_instance(task_id="tg1.t3")
assert not ti3.state
@avkirilishin are you still working on this PR?
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.