airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Dynamic mapped tasks group arguments are interpreted as MappedArgument when provided to classic operators

Open florian-guily opened this issue 1 year ago • 7 comments

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.4

What happened?

When using expand_kwargs on a task group, arguments of this task group will not get correctly interpreted when using them in classic operators inside this task group. They will be interpreted as MappedArgument instead of their real values.

What you think should happen instead?

Real value of the mapped task group's argument should be passed to the operators.

How to reproduce

This was originally done with a google cloud operator in a task group, but i managed to reproduce it with a bash operator.

from airflow import DAG
from airflow.decorators import task, task_group
from airflow.operators.bash import BashOperator
from pendulum import datetime


dag = DAG(
     dag_id="airflow_issue_test",
     start_date=datetime(2024, 1, 1, tz='UTC'),
     catchup=False,
     schedule="@daily",
     default_args={"retries": 2},
 )

with dag:

    @task()
    def list_dict_generator():
        my_list = [
                {
                    "project": "my_project",
                    "dataset": f"dataset_{number}",
                    "table": "my_table",
                    "partition_id": "my_partition_id",
                }
                for number in range(10)
            ]
        return my_list
    
    @task_group()
    def my_tg(project, dataset, table_name, partition_id): 
        BashOperator(
            task_id="bash_task",
            bash_command=f"echo {project}.{dataset}.{table_name}${partition_id}",
            env={"MY_VAR": "Hello World"}
        )
    
    partitions_to_delete = list_dict_generator()

    my_tg.expand_kwargs(partitions_to_delete)

Here are the associated logs. You can clearly see that multiple MappedArguments are passed in the echo command, which should not happen. dag_id=airflow_issue_test_run_id=scheduled__2024-04-14T00_00_00+00_00_task_id=bq_to_gcs_tg.bash_task_map_index=0_attempt=2.log

Operating System

Mac OS Sonoma 14.2.1 (23C71)

Versions of Apache Airflow Providers

apache-airflow-providers-google==10.16.0 apache-airflow-providers-common-sql>=1.11.0

Deployment

Docker-Compose

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

florian-guily avatar Apr 24 '24 08:04 florian-guily

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

boring-cyborg[bot] avatar Apr 24 '24 08:04 boring-cyborg[bot]

@florian-guily Would you like to be assigned to this issue?

RNHTTR avatar Apr 24 '24 19:04 RNHTTR

@florian-guily Would you like to be assigned to this issue?

I'd like to yes, i'll find the time to resolve it !

florian-guily avatar Apr 24 '24 20:04 florian-guily

Hello there, I've just encountered the same issue. It doesn't only happen with classic operators, but anything in a task group scope, such as:

@task_group
def process_model_requests(model_name):
    config_results = prepare_pod_config(task_id=f"config_for_{model_name}")
    result_paths = prepare_pods(config_results, f"for_{model_name}")
    config_results >> result_paths

I have the version 2.6.3 though.

renzo-sanchez-h avatar May 07 '24 17:05 renzo-sanchez-h

Hello there, I've just encountered the same issue. It doesn't only happen with classic operators, but anything in a task group scope, such as:

@task_group
def process_model_requests(model_name):
    config_results = prepare_pod_config(task_id=f"config_for_{model_name}")
    result_paths = prepare_pods(config_results, f"for_{model_name}")
    config_results >> result_paths

I have the version 2.6.3 though.

Interesting, not anything in a task group though, as python operator created with taskflow api seems to work well. Haven't tried with classic python operator.

florian-guily avatar May 08 '24 01:05 florian-guily

Interesting, not anything in a task group though, as python operator created with taskflow api seems to work well. Haven't tried with classic python operator.

Yes, sorry. I meant while using dynamic task mapping. Like the original post.

prefix = "process"
task_params = [{"task_id": "_".join([prefix, str(i + 1)]), "model_name": f"model_{i}"} for i in range(3)]
process_model_requests.expand_kwargs(task_params)

renzo-sanchez-h avatar May 08 '24 16:05 renzo-sanchez-h

Noticing this issue as well, tested a task group with task flow API and the arguments get picked up properly, but when passed to traditional operators they don't get picked up properly.

joshtree41 avatar May 14 '24 00:05 joshtree41

More examples:

  1. https://github.com/apache/airflow/issues/31481
  2. https://github.com/apache/airflow/discussions/39927

Locustv2 avatar May 31 '24 14:05 Locustv2

wondering if there is a known work-around in the mean time? or an update on this? @florian-guily ?

d-callan avatar Jul 09 '24 14:07 d-callan

didn't find a proper workaround to use non-taskflow operators so i just used python operator for everything.

I'm investigating on the fix but i'm fairly new to open source contribution so i'm a bit slow, i need to find time for that also.

florian-guily avatar Jul 09 '24 14:07 florian-guily

The only workaround would be to use taskflow or not use mapped argument IMO

Locustv2 avatar Jul 11 '24 08:07 Locustv2

Yes. And you can use simply Python subprocess.* calls to run bash script in such taskflow-decorated method, there is no particular need to use BashOperator for that if you can just run bash command yourself.

potiuk avatar Jul 11 '24 13:07 potiuk

Converting it to a discussion - as it is unlikely to be an "airflow issue"

potiuk avatar Jul 11 '24 13:07 potiuk