astro-sdk
astro-sdk copied to clipboard
Leading run_raw_sql with a comment doesn't work
Describe the bug
When using aql.run_raw_sql (tested against snowflake), using a comment at the beginning of the SQL doesn't work, e.g.:
-- this is a test
INSERT INTO demo.customers (id, name)
VALUES
(1, 'Customer A'),
(2, 'Customer B'),
(3, 'Customer C');
However, what does work is:
INSERT INTO demo.customers (id, name)
VALUES
(1, 'Customer A'),
(2, 'Customer B'),
(3, 'Customer C');
-- this is a test
I'm not able to replicate this with aql.transform, just aql.run_raw_sql. There are no errors, it just fails silently.
Version
- Astro: 0.9.1
@jlaneve when I run the following code it passes
@pytest.mark.parametrize(
"database_table_fixture",
[
{"database": Database.SQLITE},
],
indirect=True,
ids=["sqlite"],
)
def test_raw_sql_with_comment(database_table_fixture, sample_dag):
_, test_table = database_table_fixture
@aql.run_raw_sql
def raw_sql_query(my_input_table: Table, created_table: Table, num_rows: int):
return """---raw sql comment
SELECT * FROM {{my_input_table}} LIMIT {{num_rows}}"""
@task
def validate_raw_sql(cur):
print(cur)
with sample_dag:
homes_file = aql.load_file(
input_file=File(path=str(cwd) + "/../../../data/homes.csv"),
output_table=test_table,
)
raw_sql_result = (
raw_sql_query(
my_input_table=homes_file,
created_table=test_table,
num_rows=5,
handler=lambda cur: cur.fetchall(),
),
)
validate_raw_sql(raw_sql_result)
test_utils.run_dag(sample_dag)
I think if there's a comment you need to triple quote so that newlines are registered. Otherwise the code will be interpreted as a single line and nothing be executed.
@kaxil I think we can close this ticket as I don't think this is a bug in astro-sdk, but I'd like your thoughts on if there is a solution or warning we should consider.
hmm I do triple quote it - here, seed_customers_table_func fails
"""
seed_data
DAG file auto-generated by AstroBuild.
"""
import time
from datetime import datetime
import pandas as pd
from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.utils import timezone
from airflow.utils.dates import days_ago
from astro import sql as aql
from astro.files import File
from astro.sql.table import Table
@aql.run_raw_sql(conn_id='snowflake')
def create_customers_table_func():
return """CREATE OR REPLACE TABLE demo.customers (
id integer,
name varchar (100)
)"""
@aql.run_raw_sql(conn_id='snowflake')
def seed_customers_table_func(create_customers_table: Table):
return """-- manual dep: {{create_customers_table}}
INSERT INTO demo.customers (id, name)
VALUES
(1, 'Customer A'),
(2, 'Customer B'),
(3, 'Customer C');"""
@aql.transform(conn_id='snowflake')
def customers_func(seed_customers_table: Table):
return """-- manual dep: {{seed_customers_table}}
SELECT * FROM demo.customers;"""
@dag(schedule_interval='0 0 * * *', start_date=datetime(2022, 7, 11, 17, 21, 21), catchup=False, tags=[])
def seed_data():
create_customers_table = create_customers_table_func()
seed_customers_table = seed_customers_table_func(create_customers_table)
customers = customers_func(seed_customers_table, output_table=Table(conn_id='snowflake', name='tmp_astro_seed_data_customers'))
dag_obj = seed_data()
@jlaneve can you get me the logs for these tasks? I'm wondering if these tasks are actually failing. I created this DAG to more closely match what you're doing
@aql.run_raw_sql(conn_id=db.conn_id)
def create_customers_table_func(table: Table):
return """CREATE TABLE {{table}} (
id integer,
name varchar (100)
)"""
@aql.run_raw_sql(conn_id=db.conn_id)
def seed_customers_table_func(create_customers_table: Table):
return """-- manual dep: {{create_customers_table}}
INSERT INTO julian_test (id, name)
VALUES
(1, 'Customer A'),
(2, 'Customer B'),
(3, 'Customer C');"""
@aql.transform(conn_id=db.conn_id)
def customers_func(seed_customers_table: Table):
return """-- manual dep: {{seed_customers_table}}
SELECT * FROM julian_test;"""
@aql.dataframe
def validate(df: pandas.DataFrame):
print(df)
with sample_dag:
create_customers_table = create_customers_table_func(test_table)
seed_customers_table = seed_customers_table_func(create_customers_table)
customers = customers_func(seed_customers_table)
customers >> validate(test_table)
and what I'm seeing is the following error on the second task:
Traceback (most recent call last):
File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/airflow/executors/debug_executor.py", line 79, in _run_task
ti._run_raw_task(job_id=ti.job_id, **params)
File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/airflow/utils/session.py", line 71, in wrapper
return func(*args, session=session, **kwargs)
File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1451, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode)
File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1555, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2212, in render_templates
rendered_task = self.task.render_template_fields(context)
File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 1185, in render_template_fields
self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/airflow/utils/session.py", line 71, in wrapper
return func(*args, session=session, **kwargs)
File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 344, in _do_render_template_fields
rendered_content = self.render_template(
File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 398, in render_template
return tuple(self.render_template(element, context, jinja_env) for element in value)
File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 398, in <genexpr>
return tuple(self.render_template(element, context, jinja_env) for element in value)
File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 394, in render_template
return value.resolve(context)
File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/airflow/utils/session.py", line 71, in wrapper
return func(*args, session=session, **kwargs)
File "/Users/dimberman/code/astronomer/astro-project/plugins/astro/.nox/dev/lib/python3.9/site-packages/airflow/models/xcom_arg.py", line 152, in resolve
raise AirflowException(
airflow.exceptions.AirflowException: XComArg result from create_customers_table_func at tcanuh3nxi0iixy4 with key="return_value" is not found!
This would make sense as the last task returned "None" so there's nothing in the database to pull from xcom.
Does build have mechanisms for letting users know if a task failed?
It also appears that this DAG works @tatiana
test_table = Table(name="julian_test", metadata=Metadata(conn_id=db.conn_id)
@aql.run_raw_sql(conn_id=db.conn_id)
def create_customers_table_func():
return """CREATE TABLE julian_test (
id integer,
name varchar (100)
)"""
@aql.run_raw_sql(conn_id=db.conn_id)
def seed_customers_table_func():
return """-- manual dep: create_customers_table
INSERT INTO julian_test (id, name)
VALUES
(1, 'Customer A'),
(2, 'Customer B'),
(3, 'Customer C');"""
@aql.transform(conn_id=db.conn_id)
def customers_func():
return """-- manual dep: seed_customers_table
SELECT * FROM julian_test;"""
@aql.dataframe
def validate(df: pandas.DataFrame):
print(df)
with sample_dag:
create_customers_table = create_customers_table_func()
seed_customers_table = seed_customers_table_func()
customers = customers_func()
create_customers_table >> seed_customers_table >>customers >> validate(test_table)
So if we manually set the deps and don't have any templating that works fine.
This one works:
"""
jwst_temp_data
DAG file auto-generated by AstroBuild.
"""
import time
from datetime import datetime
import pandas as pd
import requests
from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.utils import timezone
from airflow.utils.dates import days_ago
from astro import sql as aql
from astro.files import File
from astro.sql.table import Table
API_URL = "https://www.jwst.nasa.gov/content/webbLaunch/flightCurrentState2.0.json"
@aql.dataframe()
def fetch_data_func():
return requests.get(API_URL).json()['currentState']
@aql.dataframe()
def metrics_func(fetch_data: pd.DataFrame):
temp_cool_side = fetch_data['tempCoolSide1C']
temp_warm_side = fetch_data['tempWarmSide1C']
delta = abs(temp_cool_side - temp_warm_side)
return pd.DataFrame([{
"datetime": str(datetime.now()),
"temp_cool_side": temp_cool_side,
"temp_warm_side": temp_warm_side,
"delta": delta,
}])
@aql.run_raw_sql(conn_id='snowflake')
def create_or_replace_table_func():
return """CREATE OR REPLACE TABLE demo.jwst_temps (
"datetime" VARCHAR(50),
"temp_cool_side" NUMBER,
"temp_warm_side" NUMBER,
"delta" NUMBER
)"""
@aql.run_raw_sql(conn_id='snowflake')
def insert_data_func(create_or_replace_table: Table, metrics: Table):
return """INSERT INTO demo.jwst_temps
SELECT * FROM {{metrics}};
-- wait for: {{create_or_replace_table}}"""
@aql.transform(conn_id='snowflake')
def confirm_insert_func(insert_data: Table):
return """SELECT * FROM demo.jwst_temps;
-- wait for: {{insert_data}}"""
@dag(schedule_interval='0 * * * *', start_date=datetime(2022, 7, 13, 21, 52, 12), catchup=False, tags=[])
def jwst_temp_data():
fetch_data = fetch_data_func()
metrics = metrics_func(fetch_data)
create_or_replace_table = create_or_replace_table_func()
insert_data = insert_data_func(create_or_replace_table, metrics)
confirm_insert = confirm_insert_func(insert_data, output_table=Table(conn_id='snowflake', name='tmp_astro_jwst_temp_data_confirm_insert'))
dag_obj = jwst_temp_data()
This one does NOT work:
"""
jwst_temp_data
DAG file auto-generated by AstroBuild.
"""
import time
from datetime import datetime
import pandas as pd
import requests
from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.utils import timezone
from airflow.utils.dates import days_ago
from astro import sql as aql
from astro.files import File
from astro.sql.table import Table
API_URL = "https://www.jwst.nasa.gov/content/webbLaunch/flightCurrentState2.0.json"
@aql.dataframe()
def fetch_data_func():
return requests.get(API_URL).json()['currentState']
@aql.dataframe()
def metrics_func(fetch_data: pd.DataFrame):
temp_cool_side = fetch_data['tempCoolSide1C']
temp_warm_side = fetch_data['tempWarmSide1C']
delta = abs(temp_cool_side - temp_warm_side)
return pd.DataFrame([{
"datetime": str(datetime.now()),
"temp_cool_side": temp_cool_side,
"temp_warm_side": temp_warm_side,
"delta": delta,
}])
@aql.run_raw_sql(conn_id='snowflake')
def create_or_replace_table_func():
return """CREATE OR REPLACE TABLE demo.jwst_temps (
"datetime" VARCHAR(50),
"temp_cool_side" NUMBER,
"temp_warm_side" NUMBER,
"delta" NUMBER
)"""
@aql.run_raw_sql(conn_id='snowflake')
def insert_data_func(metrics: Table, create_or_replace_table: Table):
return """-- wait for: {{create_or_replace_table}}
INSERT INTO demo.jwst_temps
SELECT * FROM {{metrics}};"""
@aql.transform(conn_id='snowflake')
def confirm_insert_func(insert_data: Table):
return """SELECT * FROM demo.jwst_temps;
-- wait for: {{insert_data}}"""
@dag(schedule_interval='0 * * * *', start_date=datetime(2022, 7, 13, 21, 52, 12), catchup=False, tags=[])
def jwst_temp_data():
fetch_data = fetch_data_func()
metrics = metrics_func(fetch_data)
create_or_replace_table = create_or_replace_table_func()
insert_data = insert_data_func(metrics, create_or_replace_table)
confirm_insert = confirm_insert_func(insert_data, output_table=Table(conn_id='snowflake', name='tmp_astro_jwst_temp_data_confirm_insert'))
dag_obj = jwst_temp_data()
Here's the pipeline we used to generate the code:
https://cloud.astronomer.io/cku7t3fvx59046554xr4g0siv7r/build/a340e4cf-b239-4291-86dc-30c00274df05/b119d256-160d-4626-aaae-f82f6a602498
@jlaneve when I took the generated DAG and tried to run it in an Airflow instance, I got the same error as before (namely that XCOM is failing because there's nothing to retrieve). Can you try running this in an airflow and see if you get the same? This seems to suggest that the issue might be that you're generating a DAG that doesn't work.
Thank you very much for the details, @jlaneve !
Complementing what @dimberman mentioned, the working example failed for me:
File "/Users/al-cht01/.virtualenvs/astro-sdk-tutorial/lib/python3.7/site-packages/airflow/models/abstractoperator.py", line 394, in render_template
return value.resolve(context)
File "/Users/al-cht01/.virtualenvs/astro-sdk-tutorial/lib/python3.7/site-packages/airflow/utils/session.py", line 71, in wrapper
return func(*args, session=session, **kwargs)
File "/Users/al-cht01/.virtualenvs/astro-sdk-tutorial/lib/python3.7/site-packages/airflow/models/xcom_arg.py", line 153, in resolve
f'XComArg result from {self.operator.task_id} at {context["ti"].dag_id} '
airflow.exceptions.AirflowException: XComArg result from create_or_replace_table_func at jwst_temp_data with key="return_value" is not found!
This issue surfaced a limitation in the Astro Python SDK, and Daniel logged a follow-up ticket: https://github.com/astronomer/astro-sdk/issues/569
Additional information:
- When trying to run the complete DAG in Airflow, @jlaneve confirmed he couldn't run the DAG.
- It seems the working DAG works if each task is triggered individually using Airflow, instead of running the entire DAG (behaviour of Astro Build).
- The way for us to reproduce the initial issue would be to try to run the individual task.
- A workaround is to place the comment after the SQL statement.
For these reasons, we decided this issue is not a blocker for the 1.0.0 release. I'm reopening and assigning it to our next milestone - since it seems the bug reported initially (Leading run_raw_sql with a comment doesn't work) remains a bug.
Closing it since we were not able to reproduce it on local.