astro-sdk icon indicating copy to clipboard operation
astro-sdk copied to clipboard

Issue found creating a dataframe from a quoted "illegal" table name.

Open dimberman opened this issue 2 years ago • 3 comments

Describe the bug We have found a but when creating a DAG that has an "invalid" DAG ID and then is passed to a dataframe decorated function. Version

  • Astro: [e.g. 0.6.0]
  • OS: [eg. Debian]

To Reproduce Steps to reproduce the behavior:


sample_dag.dag_id = "my=dag"

@adf
def validate(df: pd.DataFrame):
    assert len(df) == 12
    assert df.iloc[0].to_dict()["first_name"] == "PENELOPE"

@aql.transform()
def pg_query(input_table: Table):
    return "SELECT * FROM {{input_table}} WHERE last_name LIKE 'G%%'"

with sample_dag:
    pg_table = pg_query(
        input_table=Table(
            table_name="actor", conn_id="postgres_conn", database="pagila"
        )
    )
    validate(pg_table)
test_utils.run_dag(sample_dag)

When running this DAG on the pagila postgres image we find this problem:

  File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1329, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1455, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1511, in _execute_task
    result = execute_callable(context=context)
  File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/src/astro/sql/operators/sql_dataframe.py", line 82, in execute
    self.handle_op_args()
  File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/src/astro/sql/operators/sql_dataframe.py", line 66, in handle_op_args
    ret_args.append(self._get_dataframe(arg))
  File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/src/astro/sql/operators/sql_dataframe.py", line 143, in _get_dataframe
    return self.hook.get_pandas_df(query)
  File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/airflow/hooks/dbapi.py", line 138, in get_pandas_df
    return psql.read_sql(sql, con=conn, params=parameters, **kwargs)
  File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/pandas/io/sql.py", line 602, in read_sql
    return pandas_sql.read_query(
  File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/pandas/io/sql.py", line 2116, in read_query
    cursor = self.execute(*args)
  File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/pandas/io/sql.py", line 2068, in execute
    raise ex from exc
pandas.io.sql.DatabaseError: Execution failed on sql 'SELECT * FROM "astroflow_ci"."""my=dag_pg_query_1"""': relation "astroflow_ci."my=dag_pg_query_1"" does not exist
LINE 1: SELECT * FROM "astroflow_ci"."""my=dag_pg_query_1"""

Which suggests an issue parsing the quotations.

We have marked the test test_with_invalid_dag_name as xfail, and whoever takes this ticket can use that test to guide their bug resolution

Expected behavior The test should pass and postgres should be able to pull in a dataframe.

Screenshots If applicable, add screenshots to help explain your problem.

Additional context Add any other context about the problem here.

dimberman avatar Mar 14 '22 21:03 dimberman

The issue isn't with the table name. If the test runs twice create_table_name will create a table name with incremented dag_run id.

bhavaniravi avatar Mar 15 '22 05:03 bhavaniravi

There are two ways I can think of resolving this.

  1. By getting the latest dagrun of the dag and generating the intermediate table name
def get_latest_dag_run(dag):
    last_dagrun = dag.get_last_dagrun(include_externally_triggered=True)
    return last_dagrun
  1. By truncating table in tearDown.
            session.execute("TRUNCATE TABLE dag_run RESTART IDENTITY CASCADE;")
            session.execute("TRUNCATE TABLE task_instance RESTART IDENTITY CASCADE;")

Approach 2 might cause issues when running multiple tests parallelly. Approach 1 should ensure each testcase has a unique dag_id, which we can enforce with the new conftest method in setup

Shall I go ahead with 1?

cc: @tatiana

bhavaniravi avatar Mar 16 '22 05:03 bhavaniravi

Great analysis, looks promising , @bhavaniravi !

tatiana avatar Mar 16 '22 07:03 tatiana

We need to verify this before closing

sunank200 avatar Jan 19 '23 09:01 sunank200