airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id

Open Gollum999 opened this issue 2 years ago • 11 comments

Apache Airflow version

2.3.3 (latest released)

What happened

As the title states, if you have dynamically mapped tasks inside of a TaskGroup, those tasks do not get the group_id prepended to their respective task_ids. This causes at least a couple of undesirable side effects:

  1. Task names are truncated in Grid/Graph* View. The tasks below are named plus_one and plus_two:

Screenshot from 2022-07-19 13-29-05 Screenshot from 2022-07-19 13-47-47

Presumably this is because the UI normally strips off the group_id prefix.

* Graph View was very inconsistent in my experience. Sometimes the names are truncated, and sometimes they render correctly. I haven't figured out the pattern behind this behavior.

  1. Duplicate task_ids between groups result in a airflow.exceptions.DuplicateTaskIdFound, even if the group_id would normally disambiguate them.

What you think should happen instead

These dynamic tasks inside of a group should have the group_id prepended for consistent behavior.

How to reproduce

#!/usr/bin/env python3
import datetime

from airflow.decorators import dag, task
from airflow.utils.task_group import TaskGroup


@dag(
    start_date=datetime.datetime(2022, 7, 19),
    schedule_interval=None,
)
def test_dag():
    with TaskGroup(group_id='group'):
        @task
        def plus_one(x: int):
            return x + 1

        plus_one.expand(x=[1, 2, 3])

    with TaskGroup(group_id='ggg'):
        @task
        def plus_two(x: int):
            return x + 2

        plus_two.expand(x=[1, 2, 3])


dag = test_dag()


if __name__ == '__main__':
    dag.cli()

Operating System

CentOS Stream 8

Versions of Apache Airflow Providers

N/A

Deployment

Other

Deployment details

Standalone

Anything else

Possibly related: #12309

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

Gollum999 avatar Jul 19 '22 18:07 Gollum999

Ok I found the issue. If the groups are expanded on the initial page load the labels are correct. If a user clicks to expand a group the labels are not being updated. I'm creating a PR to fix this now.

bbovenzi avatar Jul 21 '22 11:07 bbovenzi

I appreciate the fast response, but could we re-open this? #25217 only fixes one of the three symptoms that I mentioned above.

I think the "root" fix is to prepend the group_id to MappedOperator.task_id, like what BaseOperator does:

if task_group:
    self.task_id = task_group.child_id(task_id)
else:
    self.task_id = task_id

Gollum999 avatar Jul 21 '22 13:07 Gollum999

Can I take this @bbovenzi ?

erdos2n avatar Jul 21 '22 21:07 erdos2n

@erdos2n go for it!

bbovenzi avatar Jul 21 '22 21:07 bbovenzi

I don't know if I can assign myself...

erdos2n avatar Jul 21 '22 21:07 erdos2n

I think the "root" fix is to prepend the group_id to MappedOperator.task_id, like what BaseOperator does

This logic already exists; a @task-decorated task calls get_unique_task_id, which calls task_group.child_id. The root cause will be deeper than this.

uranusjr avatar Jul 22 '22 04:07 uranusjr

Same core issue as https://github.com/apache/airflow/issues/23621.

This logic already exists; a @task-decorated task calls get_unique_task_id, which calls task_group.child_id. The root cause will be deeper than this.

As far as I can tell this is not correct, this is the reason I found back when I opened it:

At the end of the day it seems that

        if task_group:
            self.task_id = task_group.child_id(task_id)

Is only executed by inheritors of BaseOperator. MappedOperator does not inherit from it so it never does execute the line that would prefix correctly the task_id, hence the duplicate key error thrown later, when self.dag.add_task is called. Maybe a simple fix would be to add those two lines to MappedOperator too, although there are other places that fiddle with the task_id.

EdwardRadical avatar Aug 01 '22 23:08 EdwardRadical

MappedOperator does not call the function on its own, but receives the converted task_id from either when the BaseOperator calls partial or expand. Search for child_id and get_unique_task_id in the code base, and you can find them called when a task is expanded.

uranusjr avatar Aug 02 '22 05:08 uranusjr

@andreafromcape - since you understand how things work, I urge you to provide PR fixing the problem and unit test testing it. This will make far more productive discussion, when you go through all the deep dive there and check exactly how it works. I am not saying anything said here is correct/not correct, but in any case you seem to be smart and knowledgeable, so the best way to show it (and help others) is to provide a fix. Airlfow is created by > 2100 contribiutors, and you can become one if you think you can help others.

I think this will be far more productive than pointing out who is wrong or right.

potiuk avatar Aug 02 '22 19:08 potiuk

No, @uranusjr, there is more to it. This is the complete snippet:

    task_group = task_group or TaskGroupContext.get_current_task_group(dag)
    tg_task_id = task_group.child_id(task_id) if task_group else task_id

    if tg_task_id not in dag.task_ids:
        return task_id

This happens at the time expand is called: albeit tg_task_id indeed contains the correct label, it is ignored because tg_task_id not in dag.task_ids evaluates to True (for obvious reasons), so the naked task_id is returned by get_unique_task_id.

And this was probably a rough two-liners to fix the obvious collision with this other snippet inside BaseOperator's __init__ which calls the exact same function (task_group.child_id(task_id)):

        if task_group:
            self.task_id = task_group.child_id(task_id)
        else:
            self.task_id = task_id

So while get_unique_task_id does not return an unique task_id in case the task belongs to a group, for most use cases the task_id is fixed later by BaseOperator, but the snippet above is not contained in MappedOperator's __init__, and MappedOperator does not inherit from BaseOperator either so it's not going to run the __init__ from it - and as a consequence of these, the result will be a duplicate.

The reason why I am explaining this to you, @uranusjr, albeit I know where the issue is, I know how it manifests, and I can maybe patch it, is that I recognize I am not the one with the most knowledge of the environment here and any fix I can create will most likely not be in the spirit of the architecture, as a major contributor to AIP-42, you are that guy with expert knowledge: so I can continue to convince you for a couple messages, give up, and create a PR out of my flawed expert knowledge, or in the spirit of OSS you can hear me out and I can find how to fix it best without introducing dangerous assumptions or unsound logic.

Given how the bug seem to happen, this is a candidate fix, to the best of my knowledge:

===================================================================
diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
--- a/airflow/decorators/base.py	(revision 298be502c35006b7c3f011b676dbb4db0633bc74)
+++ b/airflow/decorators/base.py	(date 1659517989535)
@@ -359,6 +359,9 @@
         partial_kwargs.update(task_kwargs)
 
         task_id = get_unique_task_id(partial_kwargs.pop("task_id"), dag, task_group)
+        if task_group:
+            task_id = task_group.child_id(task_id)
+
         params = partial_kwargs.pop("params", None) or default_params
 
         # Logic here should be kept in sync with BaseOperatorMeta.partial().

Let me know what you think of it.

EdwardRadical avatar Aug 03 '22 09:08 EdwardRadical

@EdwardRadical Will your change fix just the id or the label too? If it's both then I say go for it. Because we definitely need to fix how we create the label in the webserver.

Also, related: https://github.com/apache/airflow/issues/25523

bbovenzi avatar Aug 08 '22 16:08 bbovenzi