airflow
airflow copied to clipboard
Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id
Apache Airflow version
2.3.3 (latest released)
What happened
As the title states, if you have dynamically mapped tasks inside of a TaskGroup
, those tasks do not get the group_id
prepended to their respective task_id
s. This causes at least a couple of undesirable side effects:
- Task names are truncated in Grid/Graph* View. The tasks below are named
plus_one
andplus_two
:
Presumably this is because the UI normally strips off the group_id
prefix.
* Graph View was very inconsistent in my experience. Sometimes the names are truncated, and sometimes they render correctly. I haven't figured out the pattern behind this behavior.
- Duplicate
task_id
s between groups result in aairflow.exceptions.DuplicateTaskIdFound
, even if thegroup_id
would normally disambiguate them.
What you think should happen instead
These dynamic tasks inside of a group should have the group_id
prepended for consistent behavior.
How to reproduce
#!/usr/bin/env python3
import datetime
from airflow.decorators import dag, task
from airflow.utils.task_group import TaskGroup
@dag(
start_date=datetime.datetime(2022, 7, 19),
schedule_interval=None,
)
def test_dag():
with TaskGroup(group_id='group'):
@task
def plus_one(x: int):
return x + 1
plus_one.expand(x=[1, 2, 3])
with TaskGroup(group_id='ggg'):
@task
def plus_two(x: int):
return x + 2
plus_two.expand(x=[1, 2, 3])
dag = test_dag()
if __name__ == '__main__':
dag.cli()
Operating System
CentOS Stream 8
Versions of Apache Airflow Providers
N/A
Deployment
Other
Deployment details
Standalone
Anything else
Possibly related: #12309
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Ok I found the issue. If the groups are expanded on the initial page load the labels are correct. If a user clicks to expand a group the labels are not being updated. I'm creating a PR to fix this now.
I appreciate the fast response, but could we re-open this? #25217 only fixes one of the three symptoms that I mentioned above.
I think the "root" fix is to prepend the group_id
to MappedOperator.task_id
, like what BaseOperator
does:
if task_group:
self.task_id = task_group.child_id(task_id)
else:
self.task_id = task_id
Can I take this @bbovenzi ?
@erdos2n go for it!
I don't know if I can assign myself...
I think the "root" fix is to prepend the group_id to MappedOperator.task_id, like what BaseOperator does
This logic already exists; a @task
-decorated task calls get_unique_task_id
, which calls task_group.child_id
. The root cause will be deeper than this.
Same core issue as https://github.com/apache/airflow/issues/23621.
This logic already exists; a @task-decorated task calls get_unique_task_id, which calls task_group.child_id. The root cause will be deeper than this.
As far as I can tell this is not correct, this is the reason I found back when I opened it:
At the end of the day it seems that
if task_group: self.task_id = task_group.child_id(task_id)
Is only executed by inheritors of
BaseOperator
.MappedOperator
does not inherit from it so it never does execute the line that would prefix correctly the task_id, hence the duplicate key error thrown later, whenself.dag.add_task
is called. Maybe a simple fix would be to add those two lines toMappedOperator
too, although there are other places that fiddle with the task_id.
MappedOperator does not call the function on its own, but receives the converted task_id
from either when the BaseOperator calls partial
or expand
. Search for child_id
and get_unique_task_id
in the code base, and you can find them called when a task is expanded.
@andreafromcape - since you understand how things work, I urge you to provide PR fixing the problem and unit test testing it. This will make far more productive discussion, when you go through all the deep dive there and check exactly how it works. I am not saying anything said here is correct/not correct, but in any case you seem to be smart and knowledgeable, so the best way to show it (and help others) is to provide a fix. Airlfow is created by > 2100 contribiutors, and you can become one if you think you can help others.
I think this will be far more productive than pointing out who is wrong or right.
No, @uranusjr, there is more to it. This is the complete snippet:
task_group = task_group or TaskGroupContext.get_current_task_group(dag)
tg_task_id = task_group.child_id(task_id) if task_group else task_id
if tg_task_id not in dag.task_ids:
return task_id
This happens at the time expand
is called: albeit tg_task_id
indeed contains the correct label, it is ignored because tg_task_id not in dag.task_ids
evaluates to True
(for obvious reasons), so the naked task_id
is returned by get_unique_task_id
.
And this was probably a rough two-liners to fix the obvious collision with this other snippet inside BaseOperator
's __init__
which calls the exact same function (task_group.child_id(task_id)
):
if task_group:
self.task_id = task_group.child_id(task_id)
else:
self.task_id = task_id
So while get_unique_task_id
does not return an unique task_id in case the task belongs to a group, for most use cases the task_id is fixed later by BaseOperator
, but the snippet above is not contained in MappedOperator
's __init__
, and MappedOperator
does not inherit from BaseOperator
either so it's not going to run the __init__
from it - and as a consequence of these, the result will be a duplicate.
The reason why I am explaining this to you, @uranusjr, albeit I know where the issue is, I know how it manifests, and I can maybe patch it, is that I recognize I am not the one with the most knowledge of the environment here and any fix I can create will most likely not be in the spirit of the architecture, as a major contributor to AIP-42, you are that guy with expert knowledge: so I can continue to convince you for a couple messages, give up, and create a PR out of my flawed expert knowledge, or in the spirit of OSS you can hear me out and I can find how to fix it best without introducing dangerous assumptions or unsound logic.
Given how the bug seem to happen, this is a candidate fix, to the best of my knowledge:
===================================================================
diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
--- a/airflow/decorators/base.py (revision 298be502c35006b7c3f011b676dbb4db0633bc74)
+++ b/airflow/decorators/base.py (date 1659517989535)
@@ -359,6 +359,9 @@
partial_kwargs.update(task_kwargs)
task_id = get_unique_task_id(partial_kwargs.pop("task_id"), dag, task_group)
+ if task_group:
+ task_id = task_group.child_id(task_id)
+
params = partial_kwargs.pop("params", None) or default_params
# Logic here should be kept in sync with BaseOperatorMeta.partial().
Let me know what you think of it.
@EdwardRadical Will your change fix just the id or the label too? If it's both then I say go for it. Because we definitely need to fix how we create the label
in the webserver.
Also, related: https://github.com/apache/airflow/issues/25523