airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Airflow 2.7.3: Mapped task group no longer works as expected

Open Ahmed-Khaled-CS opened this issue 1 year ago • 8 comments

Apache Airflow version

2.7.3

What happened

Hello folks,

I have a use case that doesn't work anymore after the release of the new version 2.7.3. I have done some investigation and found that is related to this PR as I have reverted this locally and run some tests it worked as expected.

Please find below how to reproduce the issue.

Cc @ephraimbuddy Thanks.

What you think should happen instead

No response

How to reproduce

Below is the DAG code:

def dag():

    @task
    def make_list():
        return [4, 42, 2]

    @task
    def double(n):
        if n == 42:
            raise AirflowSkipException("42")
        return n*2

    @task
    def last(n):
        print(n)

    @task_group
    def group(n: int) -> None:
        last(double(n))

    list = make_list()
    group.expand(n=list)

dag()

Running this DAG with the 2.7.3 version, will give you the result below:

Screenshot 2023-11-08 at 22 58 07 Screenshot 2023-11-08 at 22 58 22 Screenshot 2023-11-08 at 22 58 30

As you can see, the status of the last task is skipped and the map index 0 and 1 was not launched. You will obtain the same issue with AirflowFailException instead of AirflowSkipException and the status of the last task will be failed.

In the other hand, if you run the same DAG with the 2.7.2 version (or without the changes in the provided PR above), you will get the result below which is the correct one:

Screenshot 2023-11-08 at 22 53 06 Screenshot 2023-11-08 at 22 53 31 Screenshot 2023-11-08 at 22 53 40

Operating System

linux

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

Airflow Breeze

Anything else

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

Ahmed-Khaled-CS avatar Nov 08 '23 22:11 Ahmed-Khaled-CS

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

boring-cyborg[bot] avatar Nov 08 '23 22:11 boring-cyborg[bot]

Hello @ephraimbuddy, @uranusjr, and @eladkal , could you please confirm that it is a regression after the commit of this PR ?

I'm pining you as you have contributed on this PR and hopefully that you have an idea about my issue 😄

thank you.

Ahmed-Khaled-CS avatar Nov 14 '23 12:11 Ahmed-Khaled-CS

I saw this behaviour change and mentioned it here but I guess, I acted fast in merging(though waited for some weeks).

The issue is that the task has to wait for all the upstream tasks to be done before proceeding. Now, when proceeding, this check will be true as one is skipped and the ti would be marked as skipped.

@uranusjr , Do you think we should apply the first commit?

ephraimbuddy avatar Nov 14 '23 12:11 ephraimbuddy

I also mentioned in that commit that the fix may not work right either though. I feel we should revert the PR entirely and investigate a correct fix altogether.

uranusjr avatar Nov 15 '23 08:11 uranusjr

I feel we should revert the PR entirely and investigate a correct fix altogether.

Can we set a unit test to catch the problematic case thus when we try new approach we would be able to see if it passes or not?

eladkal avatar Nov 25 '23 09:11 eladkal

I believe now we have test so we can re-do attempt to fix it?

potiuk avatar Dec 30 '23 11:12 potiuk

@potiuk i think https://github.com/apache/airflow/pull/36462 handles it?

eladkal avatar Dec 30 '23 15:12 eladkal

Not sure :). Have not looked at it (yet).

potiuk avatar Dec 30 '23 17:12 potiuk

I'm closing this issue, as the original PR (#34337) that caused it was reverted. I'm trying to work on an improved version of the original PR, and I'll ensure that this issue doesn't reoccur.

shahar1 avatar Jun 15 '24 09:06 shahar1