airflow
airflow copied to clipboard
Airflow 2.7.3: Mapped task group no longer works as expected
Apache Airflow version
2.7.3
What happened
Hello folks,
I have a use case that doesn't work anymore after the release of the new version 2.7.3. I have done some investigation and found that is related to this PR as I have reverted this locally and run some tests it worked as expected.
Please find below how to reproduce the issue.
Cc @ephraimbuddy Thanks.
What you think should happen instead
No response
How to reproduce
Below is the DAG code:
def dag():
@task
def make_list():
return [4, 42, 2]
@task
def double(n):
if n == 42:
raise AirflowSkipException("42")
return n*2
@task
def last(n):
print(n)
@task_group
def group(n: int) -> None:
last(double(n))
list = make_list()
group.expand(n=list)
dag()
Running this DAG with the 2.7.3 version, will give you the result below:
As you can see, the status of the last task is skipped and the map index 0 and 1 was not launched. You will obtain the same issue with AirflowFailException instead of AirflowSkipException and the status of the last task will be failed.
In the other hand, if you run the same DAG with the 2.7.2 version (or without the changes in the provided PR above), you will get the result below which is the correct one:
Operating System
linux
Versions of Apache Airflow Providers
No response
Deployment
Docker-Compose
Deployment details
Anything else
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
Hello @ephraimbuddy, @uranusjr, and @eladkal , could you please confirm that it is a regression after the commit of this PR ?
I'm pining you as you have contributed on this PR and hopefully that you have an idea about my issue 😄
thank you.
I saw this behaviour change and mentioned it here but I guess, I acted fast in merging(though waited for some weeks).
The issue is that the task has to wait for all the upstream tasks to be done before proceeding. Now, when proceeding, this check will be true as one is skipped and the ti would be marked as skipped.
@uranusjr , Do you think we should apply the first commit?
I also mentioned in that commit that the fix may not work right either though. I feel we should revert the PR entirely and investigate a correct fix altogether.
I feel we should revert the PR entirely and investigate a correct fix altogether.
Can we set a unit test to catch the problematic case thus when we try new approach we would be able to see if it passes or not?
I believe now we have test so we can re-do attempt to fix it?
@potiuk i think https://github.com/apache/airflow/pull/36462 handles it?
Not sure :). Have not looked at it (yet).
I'm closing this issue, as the original PR (#34337) that caused it was reverted. I'm trying to work on an improved version of the original PR, and I'll ensure that this issue doesn't reoccur.