airflow
airflow copied to clipboard
Params Fail to Evaluate In Dag Body
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.7.2
What happened?
When providing airflow params, the parameter is not available in the body of the airflow dag.
What you think should happen instead?
The value should be available in the airflow dag
How to reproduce
from airflow import DAG
from airflow.models.param import Param
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
with DAG("generate_password_dag", params={"test": Param("defeault", type="string")}) as dag:
sql_operator = SQLExecuteQueryOperator(
task_id="sql_operator",
sql="sql/{{ params.test }}.sql",
database="default",
)
Operating System
NA
Versions of Apache Airflow Providers
NA
Deployment
Official Apache Airflow Helm Chart
Deployment details
NA
Anything else?
NA
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
This rendered because you have .sql suffix in the end of your string. In this case it will tried to read template from the template file without rendering file path
Thanks for your response @Taragolis! For my use-case, I'd like to determine the SQL file with a param provided in the UI
You can use Jinja's include tag to render a file's contents.
For example, using this SQL file;
# dags/sql/default.sql
SELECT 'from a templated file';
and this task definition:
from airflow.models.dag import DAG
from airflow.models.param import Param
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
with DAG(
"generate_password_dag",
params={"test": Param("default", type="string")},
):
sql_operator = SQLExecuteQueryOperator(
task_id="sql_operator",
sql="{% include 'sql/' ~ params.test ~ '.sql' %}",
conn_id="postgres",
)
In the above task definition, ~ is string concatenation in Jinja.
Task log:
@oliver-helix Can you try that approach and see if it works please?
Thank you @josh-fell! This indeed works. Curious why sql="{% include 'sql/' ~ params.test ~ '.sql' %}" works when sql="sql/{{ params.test }}.sql" does not?
works when sql="sql/{{ params.test }}.sql" does not?
It doesn't work by design, see: https://github.com/apache/airflow/issues/39651#issuecomment-2113385429
Curious why sql="{% include 'sql/' ~ params.test ~ '.sql' %}" works
It is just a Jinja statement which rendered