airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Bug: Unable to use expand on a TaskGroup containing a @task.kubernetes_cmd task

Open SoinSoin opened this issue 3 months ago • 10 comments

Apache Airflow version

3.0.6

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Summary

Since Airflow 3.0 (tested on 3.0.6), it is impossible to use expand on a @task_group containing a task decorated with @task.kubernetes_cmd.
Expansion works correctly for a standalone @task.kubernetes_cmd task, or for a @task_group containing a @task.kubernetes task, but fails in the specific case of a task group + kubernetes_cmd.
The mapped data is not correctly interpreted or propagated.


This code raises an error or does not correctly map the values to the internal TaskGroup task (echo_cmd).
Note: If you replace @task.kubernetes_cmd with @task.kubernetes, it works as expected.


Expected behavior

  • Expansion (expand) on a TaskGroup containing a @task.kubernetes_cmd task should work just like with other TaskFlow-compatible tasks.

Observed behavior

  • The mapping/expand does not pass the expected values to the internal task within the TaskGroup.
  • The value injected into the internal task is a MappedArgument object (e.g., <airflow.models.xcom_arg.MappedArgument object at ...>) instead of the actual value (e.g. "foo", "bar").
  • There is no similar issue with @task.kubernetes in the same scenario.

Technical details

  • When mapping via expand on a @task_group containing a @task.kubernetes_cmd task, the value received by the internal task is an unresolved MappedArgument object.

Environment

  • Airflow: 3.0.6
  • Deployment: via the official Helm chart (apache/airflow)
  • Image used: apache/airflow:3.0.6
  • Python version: not explicitly set, using the official image default

Notes / leads

  • It seems that expand/mapping on TaskGroups does not work with this specific decorator, while it does with others.
  • No existing issue found as of October 2025.

Thanks for your help and for all your great work on Airflow!

What you think should happen instead?

No response

How to reproduce

from airflow.decorators import dag, task, task_group
from datetime import datetime

@dag(start_date=datetime(2023,1,1), schedule=None, catchup=False)
def demo():
    @task
    def make_values():
        return ["foo", "bar", "baz"]

    @task.kubernetes_cmd(image="alpine")
    def echo_cmd(name):
        return ["echo", name]

    @task_group
    def group_task(name):
        echo_cmd(name)

    # This works:
    # echo_cmd.expand(name=make_values())

    # This does NOT work:
    group_task.expand(name=make_values())

dag = demo()

Operating System

apache/airflow:3.0.6

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

SoinSoin avatar Oct 07 '25 13:10 SoinSoin

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

boring-cyborg[bot] avatar Oct 07 '25 13:10 boring-cyborg[bot]

+1 I’m experiencing the same issue on Airflow

Mazen94 avatar Oct 08 '25 06:10 Mazen94

+1, I can confirm this is happening on Airflow.

Nonolabiscotte avatar Oct 08 '25 07:10 Nonolabiscotte

Can someone please include the full error?

ashb avatar Oct 21 '25 09:10 ashb

Can someone please include the full error?

ERROR - Task failed with exception: source="task"
TypeError: Expected echo_cmd to return a list of strings, but got ['echo', MappedArgument(_input=DictOfListsExpandInput(value={'name': XComArg(<Task(_PythonDecoratedOperator): make_values>)}), _key='name')]
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 920 in run

File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 1215 in _execute_task

File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py", line 397 in wrapper

File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py", line 72 in execute

File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py", line 92 in _generate_cmds

SoinSoin avatar Oct 24 '25 06:10 SoinSoin

Hello @ashb, I don't know where you are with this issue, but in case it can help you, I think this issue might actually be linked to taskGroup + template variable referencing another template variable rather than taskgroup + kubernetes_cmd directly.

I think this dag is showcasing the issue:

from airflow.sdk import BaseOperator, dag, task_group, task

from datetime import datetime


class CustomOperator(BaseOperator):
    template_fields = (
        "raw_input_file",
        "input_file",
    )

    def __init__(self, input_file, **kwargs):
        self.raw_input_file = input_file

        # This line causes the error with MappedArgument inside of taskGroup in Airflow 3.x
        self.input_file = "{{ task.raw_input_file }}"
        super().__init__(**kwargs)

    def execute(self, context):
        # Always print the raw file name
        print(f"Raw file: {self.raw_input_file}")
        # Print the raw file name if task is mapped directly
        # But print MappedArgument(_input=DictOfListsExpandInput(value={'input_file': ['s3://bucket/path/to/file1.csv', 's3://bucket/path/to/file2.csv', 's3://bucket/path/to/file3.csv']}), _key='input_file')
        # if task is mapped inside of taskGroup
        print(f"Derived file: {self.input_file}")
        return self.raw_input_file


@dag(
    dag_id="test_mapped_argument_split_issue",
    start_date=datetime(2025, 1, 1),
    schedule=None,
    catchup=False,
)
def test_mapped_argument_split():
    """Reproduce the MappedArgument split error"""
    input_files = [
        "s3://bucket/path/to/file1.csv",
        "s3://bucket/path/to/file2.csv",
        "s3://bucket/path/to/file3.csv",
    ]

    @task_group()
    def test_group(
            input_file: str,
    ) -> CustomOperator:
        return CustomOperator(input_file=input_file, task_id="custom_operator")
    test_group.expand(input_file=input_files)
    CustomOperator.partial(task_id="custom_operator").expand(input_file=input_files)


test_dag = test_mapped_argument_split()

This is the resulting graph:

Image

The mapped task though the test_group have a log along this line:

Raw file: s3://bucket/path/to/file3.csv
Derived file: MappedArgument(_input=DictOfListsExpandInput(value={'input_file': ['s3://bucket/path/to/file1.csv', 's3://bucket/path/to/file2.csv', 's3://bucket/path/to/file3.csv']}), _key='input_file')

While the task mapped directly have a log along this line:

Raw file: s3://bucket/path/to/file3.csv
Derived file: s3://bucket/path/to/file3.csv

So mapping directly the task work as expected/like in 2.X while mapping through a taskgroup does not.

tmaurin avatar Oct 30 '25 12:10 tmaurin

I haven't made any start on this issue, I've only some some preliminary triage.

ashb avatar Oct 30 '25 15:10 ashb

Small update: I'm not 100% it is the same thing, but I found out you can reproduce the issue by using an fstring instead of referencing another variable. In case it can be usefull starting from above example for the TaskGroup:

class CustomOperator(BaseOperator):
    template_fields = (
        "input_file",
    )
    def __init__(self, input_file, **kwargs):
        # This line causes the error with MappedArgument inside of taskGroup in Airflow 3.x
        self.input_file = f"toto/{input_file}"
        super().__init__(**kwargs)

    def execute(self, context: Context):
        print(f"file: {self.input_file}")
        return self.input_file

@dag(
    dag_id="test_mapped_argument_split_issue3",
    start_date=datetime(2025, 1, 1),
    schedule=None,
    catchup=False,
)
def test_mapped_argument_split():
    """Reproduce the MappedArgument split error"""
    input_files = [
        "s3://bucket/path/to/file1.csv",
        "s3://bucket/path/to/file2.csv",
        "s3://bucket/path/to/file3.csv",
    ]

    @task_group()
    def test_group(
            input_file: str,
    ) -> CustomOperator:
        return CustomOperator(input_file=f"{input_file}", task_id="custom_operator")

    test_group.expand(input_file=input_files)

Will print:

file: toto/MappedArgument(_input=DictOfListsExpandInput(value={'input_file': ['s3://bucket/path/to/file1.csv', 's3://bucket/path/to/file2.csv', 's3://bucket/path/to/file3.csv']}), _key='input_file')

tmaurin avatar Oct 30 '25 19:10 tmaurin

i want to take a shot at this, can one of the maintainers assign to me please. šŸ™Œ

akkik04 avatar Dec 06 '25 23:12 akkik04

Put up a PR for this, looking forward to getting some eyes on it. šŸ™Œ cc: @ephraimbuddy @kaxil

akkik04 avatar Dec 11 '25 06:12 akkik04