airflow icon indicating copy to clipboard operation
airflow copied to clipboard

DB Parameters not passed through to Snowflake API Connection

Open alexcsolomon1 opened this issue 1 year ago • 2 comments

Apache Airflow Provider(s)

snowflake

Versions of Apache Airflow Providers

apache-airflow-providers-snowflake == 5.5.0

Apache Airflow version

2.9.1

Operating System

mac os

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

What happened

When using the SnowflakeSqlApiOperator, and specifying a DB parameter such as schema to override what is in the airflow connection, the DB parameters are ignored.

For example, if the schema specified and retrieved via the snowflake_conn_id is DEFAULT, but the SnowflakeSqlApiOperator is instantiated with the argument schema='OTHER', the query is still executing against the API with the DEFAULT schema.

The same is true for Role, Warehouse, etc

What you think should happen instead

In this case, the query should execute with the schema specified in the arguments of SnowflakeSQLApiOperator, which should take preference over what is in the snowflake_conn_id specified object.

How to reproduce

Assume in snowflake that you have a schema structure like:

Database:

  • DEFAULT schema
    • Table1
  • OTHER schema
    • Table2

Then in airflow, the connection object 'db_conn' contains the key: {.... "schema": "DEFAULT"....}

Finally, DAG code contains

dag = DAG(
    'repro_dag',
    default_args=default_args,
    description='A DAG to demonstrate the SnowflakeSqlApiOperator DB params bug',
    schedule_interval=None,
    start_date=datetime(2024, 5, 14)
)

repro_task = SnowflakeSqlApiOperator(
    task_id="repro_task",
    sql="select * from Table2;",
    snowflake_conn_id="db_conn",
    dag=dag,
    schema="OTHER"
)

repro_task

This will fail with

SQL compilation error:
Table 'TABLE2' does not exist or not authorized.

Further debugging (checking the conn_config['schema'] in hook's execute_query) indicates this is because schema is still on DEFAULT for post to API.

Anything else

hook_params are set in the SnowflakeSqlApiOperator init, a similar pattern that is used in the base airflow SQL operator. However, in the case of the SnowflakeSqlApiOperator, these hook_params are never used in creating the hook.

I think this issue can be solved by modifying the the instantiation of the SnowflakeSqlApiHook by passing the hook_params as the kwargs:

self._hook = SnowflakeSqlApiHook(
  snowflake_conn_id=self.snowflake_conn_id,
  token_life_time=self.token_life_time,
  token_renewal_delta=self.token_renewal_delta,
  deferrable=self.deferrable,
  **self.hook_params,
)

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

alexcsolomon1 avatar May 14 '24 16:05 alexcsolomon1

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

boring-cyborg[bot] avatar May 14 '24 16:05 boring-cyborg[bot]

@alexcsolomon1 All yours, feel free to submit a PR! Thanks for logging the issue.

josh-fell avatar May 15 '24 02:05 josh-fell