airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Dynamic mapping over task group is broken with multiple_outputs

Open tanelk opened this issue 1 year ago • 5 comments

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

When returning multiple outputs from a task and mapping task groups over these values, then the number of expanded tasks is not the number of returned values, but instead the number of returned keys in the dictionary.

What you think should happen instead?

No response

How to reproduce

Minimal DAG to reproduce:

from airflow.decorators import task_group, task
from datetime import datetime

@task(multiple_outputs=True)
def list_values():
    return {
        'values1': [1],
        'values2': [2, 3, 4],
    }

@task
def subtask(value):
    print(value)

@task_group
def process(value):
    subtask(value=value)

with DAG(
    dag_id='test_dag',
    schedule_interval="*/1 * * * *",
    start_date=datetime(2024, 1, 29),
    max_active_runs=1,
) as dag:

    values = list_values()

    process.override(group_id='process1').expand(value=values['values1'])
    process.override(group_id='process2').expand(value=values['values2'])

The subtask in process1 should have 1 mapped task and in process2 it should have 3 mapped tasks. Instead there are 2 mapped tasks in both task groups.

The 2 tasks in process2 print correct values (2 and 3), but third value is not accessed. First task in process1 print correct value (1) and the second one gets exception:

[2024-01-29, 14:54:11 UTC] {abstractoperator.py:707} ERROR - Exception rendering Jinja template for task 'process1.subtask', field 'op_kwargs'. Template: {'value': MappedArgument(_input=DictOfListsExpandInput(value={'value': XComArg(, 'values1')}), _key='value')}
Traceback (most recent call last):
  File "/opt/airflow/airflow/models/abstractoperator.py", line 699, in _do_render_template_fields
    rendered_content = self.render_template(
  File "/opt/airflow/airflow/template/templater.py", line 186, in render_template
    return {k: self.render_template(v, context, jinja_env, oids) for k, v in value.items()}
  File "/opt/airflow/airflow/template/templater.py", line 186, in 
    return {k: self.render_template(v, context, jinja_env, oids) for k, v in value.items()}
  File "/opt/airflow/airflow/template/templater.py", line 176, in render_template
    return value.resolve(context)
  File "/opt/airflow/airflow/utils/session.py", line 79, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/airflow/airflow/models/expandinput.py", line 70, in resolve
    data, _ = self._input.resolve(context, session=session)
  File "/opt/airflow/airflow/models/expandinput.py", line 201, in resolve
    data = {k: self._expand_mapped_field(k, v, context, session=session) for k, v in self.value.items()}
  File "/opt/airflow/airflow/models/expandinput.py", line 201, in 
    data = {k: self._expand_mapped_field(k, v, context, session=session) for k, v in self.value.items()}
  File "/opt/airflow/airflow/models/expandinput.py", line 185, in _expand_mapped_field
    return value[found_index]
IndexError: list index out of range

A screenshot of the DAG: image

Operating System

Arch linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

Tried with virtualenv install of v2.8.1 and also with breeze on main branch.

Anything else?

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

tanelk avatar Jan 29 '24 15:01 tanelk

I am not sure but I feel it is a mis-interpretation of the mapping function. The values returned from task list_values are mapped already at parse-time as the array access is not made during DAG scheduling time. I believe it is broken my default.

To achieve the results you are aiming for, I feel like you need to implement a custom map() function for the expansion to take place during scheduling time, not parse-time. (The way you express it the function pointer as array is used and mapped as array)

jscheffl avatar Jan 29 '24 19:01 jscheffl

I looked at it a bit more and the issue seems to be with the airflow.models.taskmap.TaskMap design. The airflow.models.taskinstance._record_task_map_for_downstreams method does not look at the multiple_outputs value and will create a single TaskMap from the dictionary. It probably should create multiple task maps, but that is a quite significant change.

I found a simple workaround for the bug - a minimalistic python function, that just returns the input value:

@task
def pass_through(values):
    return values

And then use it like this:

process.override(group_id='process1').expand(value=pass_through(values['values1']))
process.override(group_id='process2').expand(value=pass_through(values['values2']))

tanelk avatar Jan 30 '24 05:01 tanelk

The same issue occurs when not using task groups, but it is handled a bit more gracefully: https://github.com/apache/airflow/blob/8914e49551d8ae5ece7418950b011c1f338b4634/airflow/models/mappedoperator.py#L120-L134

tanelk avatar Jan 30 '24 05:01 tanelk

Probably you are right. As I was re-checking the docs/code I remembered there is this LazyXComAccess being generated (see https://airflow.apache.org/docs/apache-airflow/2.5.2/authoring-and-scheduling/dynamic-task-mapping.html#simple-mapping).

Maybe the simple workaround is really the best because I am also not sure how to correctly tell the LazyXComAccess to reference one key of the returned XComs.

I'd propose to make it explicit with:

@task
def pass_through1(values):
    return values['values1']

...and then followed by...

process.override(group_id='process1').expand(value=pass_through1(values()))

...otherwise like in the note it will probably generate warnings for the expansion?

jscheffl avatar Jan 30 '24 21:01 jscheffl

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Feb 14 '24 00:02 github-actions[bot]

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Mar 06 '24 00:03 github-actions[bot]

This issue has been closed because it has not received response from the issue author.

github-actions[bot] avatar Mar 14 '24 00:03 github-actions[bot]