Bug: Unable to use expand on a TaskGroup containing a @task.kubernetes_cmd task
Apache Airflow version
3.0.6
If "Other Airflow 2 version" selected, which one?
No response
What happened?
Summary
Since Airflow 3.0 (tested on 3.0.6), it is impossible to use expand on a @task_group containing a task decorated with @task.kubernetes_cmd.
Expansion works correctly for a standalone @task.kubernetes_cmd task, or for a @task_group containing a @task.kubernetes task, but fails in the specific case of a task group + kubernetes_cmd.
The mapped data is not correctly interpreted or propagated.
This code raises an error or does not correctly map the values to the internal TaskGroup task (echo_cmd).
Note: If you replace @task.kubernetes_cmd with @task.kubernetes, it works as expected.
Expected behavior
- Expansion (
expand) on a TaskGroup containing a@task.kubernetes_cmdtask should work just like with other TaskFlow-compatible tasks.
Observed behavior
- The mapping/expand does not pass the expected values to the internal task within the TaskGroup.
- The value injected into the internal task is a
MappedArgumentobject (e.g.,<airflow.models.xcom_arg.MappedArgument object at ...>) instead of the actual value (e.g."foo","bar"). - There is no similar issue with
@task.kubernetesin the same scenario.
Technical details
- When mapping via
expandon a@task_groupcontaining a@task.kubernetes_cmdtask, the value received by the internal task is an unresolvedMappedArgumentobject.
Environment
- Airflow: 3.0.6
- Deployment: via the official Helm chart (apache/airflow)
- Image used:
apache/airflow:3.0.6 - Python version: not explicitly set, using the official image default
Notes / leads
- It seems that expand/mapping on TaskGroups does not work with this specific decorator, while it does with others.
- No existing issue found as of October 2025.
Thanks for your help and for all your great work on Airflow!
What you think should happen instead?
No response
How to reproduce
from airflow.decorators import dag, task, task_group
from datetime import datetime
@dag(start_date=datetime(2023,1,1), schedule=None, catchup=False)
def demo():
@task
def make_values():
return ["foo", "bar", "baz"]
@task.kubernetes_cmd(image="alpine")
def echo_cmd(name):
return ["echo", name]
@task_group
def group_task(name):
echo_cmd(name)
# This works:
# echo_cmd.expand(name=make_values())
# This does NOT work:
group_task.expand(name=make_values())
dag = demo()
Operating System
apache/airflow:3.0.6
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [x] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
+1 Iām experiencing the same issue on Airflow
+1, I can confirm this is happening on Airflow.
Can someone please include the full error?
Can someone please include the full error?
ERROR - Task failed with exception: source="task"
TypeError: Expected echo_cmd to return a list of strings, but got ['echo', MappedArgument(_input=DictOfListsExpandInput(value={'name': XComArg(<Task(_PythonDecoratedOperator): make_values>)}), _key='name')]
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 920 in run
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 1215 in _execute_task
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py", line 397 in wrapper
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py", line 72 in execute
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py", line 92 in _generate_cmds
Hello @ashb, I don't know where you are with this issue, but in case it can help you, I think this issue might actually be linked to taskGroup + template variable referencing another template variable rather than taskgroup + kubernetes_cmd directly.
I think this dag is showcasing the issue:
from airflow.sdk import BaseOperator, dag, task_group, task
from datetime import datetime
class CustomOperator(BaseOperator):
template_fields = (
"raw_input_file",
"input_file",
)
def __init__(self, input_file, **kwargs):
self.raw_input_file = input_file
# This line causes the error with MappedArgument inside of taskGroup in Airflow 3.x
self.input_file = "{{ task.raw_input_file }}"
super().__init__(**kwargs)
def execute(self, context):
# Always print the raw file name
print(f"Raw file: {self.raw_input_file}")
# Print the raw file name if task is mapped directly
# But print MappedArgument(_input=DictOfListsExpandInput(value={'input_file': ['s3://bucket/path/to/file1.csv', 's3://bucket/path/to/file2.csv', 's3://bucket/path/to/file3.csv']}), _key='input_file')
# if task is mapped inside of taskGroup
print(f"Derived file: {self.input_file}")
return self.raw_input_file
@dag(
dag_id="test_mapped_argument_split_issue",
start_date=datetime(2025, 1, 1),
schedule=None,
catchup=False,
)
def test_mapped_argument_split():
"""Reproduce the MappedArgument split error"""
input_files = [
"s3://bucket/path/to/file1.csv",
"s3://bucket/path/to/file2.csv",
"s3://bucket/path/to/file3.csv",
]
@task_group()
def test_group(
input_file: str,
) -> CustomOperator:
return CustomOperator(input_file=input_file, task_id="custom_operator")
test_group.expand(input_file=input_files)
CustomOperator.partial(task_id="custom_operator").expand(input_file=input_files)
test_dag = test_mapped_argument_split()
This is the resulting graph:
The mapped task though the test_group have a log along this line:
Raw file: s3://bucket/path/to/file3.csv
Derived file: MappedArgument(_input=DictOfListsExpandInput(value={'input_file': ['s3://bucket/path/to/file1.csv', 's3://bucket/path/to/file2.csv', 's3://bucket/path/to/file3.csv']}), _key='input_file')
While the task mapped directly have a log along this line:
Raw file: s3://bucket/path/to/file3.csv
Derived file: s3://bucket/path/to/file3.csv
So mapping directly the task work as expected/like in 2.X while mapping through a taskgroup does not.
I haven't made any start on this issue, I've only some some preliminary triage.
Small update: I'm not 100% it is the same thing, but I found out you can reproduce the issue by using an fstring instead of referencing another variable. In case it can be usefull starting from above example for the TaskGroup:
class CustomOperator(BaseOperator):
template_fields = (
"input_file",
)
def __init__(self, input_file, **kwargs):
# This line causes the error with MappedArgument inside of taskGroup in Airflow 3.x
self.input_file = f"toto/{input_file}"
super().__init__(**kwargs)
def execute(self, context: Context):
print(f"file: {self.input_file}")
return self.input_file
@dag(
dag_id="test_mapped_argument_split_issue3",
start_date=datetime(2025, 1, 1),
schedule=None,
catchup=False,
)
def test_mapped_argument_split():
"""Reproduce the MappedArgument split error"""
input_files = [
"s3://bucket/path/to/file1.csv",
"s3://bucket/path/to/file2.csv",
"s3://bucket/path/to/file3.csv",
]
@task_group()
def test_group(
input_file: str,
) -> CustomOperator:
return CustomOperator(input_file=f"{input_file}", task_id="custom_operator")
test_group.expand(input_file=input_files)
Will print:
file: toto/MappedArgument(_input=DictOfListsExpandInput(value={'input_file': ['s3://bucket/path/to/file1.csv', 's3://bucket/path/to/file2.csv', 's3://bucket/path/to/file3.csv']}), _key='input_file')
i want to take a shot at this, can one of the maintainers assign to me please. š
Put up a PR for this, looking forward to getting some eyes on it. š cc: @ephraimbuddy @kaxil