airflow
airflow copied to clipboard
DB Parameters not passed through to Snowflake API Connection
Apache Airflow Provider(s)
snowflake
Versions of Apache Airflow Providers
apache-airflow-providers-snowflake == 5.5.0
Apache Airflow version
2.9.1
Operating System
mac os
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
What happened
When using the SnowflakeSqlApiOperator, and specifying a DB parameter such as schema to override what is in the airflow connection, the DB parameters are ignored.
For example, if the schema specified and retrieved via the snowflake_conn_id is DEFAULT, but the SnowflakeSqlApiOperator is instantiated with the argument schema='OTHER', the query is still executing against the API with the DEFAULT schema.
The same is true for Role, Warehouse, etc
What you think should happen instead
In this case, the query should execute with the schema specified in the arguments of SnowflakeSQLApiOperator, which should take preference over what is in the snowflake_conn_id specified object.
How to reproduce
Assume in snowflake that you have a schema structure like:
Database:
- DEFAULT schema
- Table1
- OTHER schema
- Table2
Then in airflow, the connection object 'db_conn' contains the key: {.... "schema": "DEFAULT"....}
Finally, DAG code contains
dag = DAG(
'repro_dag',
default_args=default_args,
description='A DAG to demonstrate the SnowflakeSqlApiOperator DB params bug',
schedule_interval=None,
start_date=datetime(2024, 5, 14)
)
repro_task = SnowflakeSqlApiOperator(
task_id="repro_task",
sql="select * from Table2;",
snowflake_conn_id="db_conn",
dag=dag,
schema="OTHER"
)
repro_task
This will fail with
SQL compilation error:
Table 'TABLE2' does not exist or not authorized.
Further debugging (checking the conn_config['schema'] in hook's execute_query) indicates this is because schema is still on DEFAULT for post to API.
Anything else
hook_params are set in the SnowflakeSqlApiOperator init, a similar pattern that is used in the base airflow SQL operator. However, in the case of the SnowflakeSqlApiOperator, these hook_params are never used in creating the hook.
I think this issue can be solved by modifying the the instantiation of the SnowflakeSqlApiHook by passing the hook_params as the kwargs:
self._hook = SnowflakeSqlApiHook(
snowflake_conn_id=self.snowflake_conn_id,
token_life_time=self.token_life_time,
token_renewal_delta=self.token_renewal_delta,
deferrable=self.deferrable,
**self.hook_params,
)
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
@alexcsolomon1 All yours, feel free to submit a PR! Thanks for logging the issue.