airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Params Fail to Evaluate In Dag Body

Open oliver-helix opened this issue 1 year ago • 3 comments

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.7.2

What happened?

When providing airflow params, the parameter is not available in the body of the airflow dag.

What you think should happen instead?

The value should be available in the airflow dag

How to reproduce

from airflow import DAG
from airflow.models.param import Param
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

with DAG("generate_password_dag", params={"test": Param("defeault", type="string")}) as dag:
    sql_operator = SQLExecuteQueryOperator(
        task_id="sql_operator",
        sql="sql/{{ params.test }}.sql",
        database="default",
    )

Operating System

NA

Versions of Apache Airflow Providers

NA

Deployment

Official Apache Airflow Helm Chart

Deployment details

NA

Anything else?

NA

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

oliver-helix avatar May 15 '24 20:05 oliver-helix

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

boring-cyborg[bot] avatar May 15 '24 20:05 boring-cyborg[bot]

This rendered because you have .sql suffix in the end of your string. In this case it will tried to read template from the template file without rendering file path

Taragolis avatar May 15 '24 20:05 Taragolis

Thanks for your response @Taragolis! For my use-case, I'd like to determine the SQL file with a param provided in the UI

oliver-helix avatar May 16 '24 19:05 oliver-helix

You can use Jinja's include tag to render a file's contents.

For example, using this SQL file;

# dags/sql/default.sql
SELECT 'from a templated file';

and this task definition:

from airflow.models.dag import DAG
from airflow.models.param import Param
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

with DAG(
    "generate_password_dag",
    params={"test": Param("default", type="string")},
):
    sql_operator = SQLExecuteQueryOperator(
        task_id="sql_operator",
        sql="{% include 'sql/' ~ params.test ~ '.sql' %}",
        conn_id="postgres",
    )

In the above task definition, ~ is string concatenation in Jinja.

Task log: image

@oliver-helix Can you try that approach and see if it works please?

josh-fell avatar May 17 '24 01:05 josh-fell

Thank you @josh-fell! This indeed works. Curious why sql="{% include 'sql/' ~ params.test ~ '.sql' %}" works when sql="sql/{{ params.test }}.sql" does not?

oliver-helix avatar May 17 '24 02:05 oliver-helix

works when sql="sql/{{ params.test }}.sql" does not?

It doesn't work by design, see: https://github.com/apache/airflow/issues/39651#issuecomment-2113385429

Curious why sql="{% include 'sql/' ~ params.test ~ '.sql' %}" works

It is just a Jinja statement which rendered

Taragolis avatar May 17 '24 11:05 Taragolis