astro-sdk
astro-sdk copied to clipboard
Open lineage not working on a DAG
Describe the bug The following example DAG is not able to populate the lineage:
"""
### Simple EL Pipeline with Data Quality Checks Using Snowflake
Runs a data quality check, in SQL, on the forest fires dataset
Note that this DAG will clean up after itself and delete all data it uploads.
Ensure a Snowflake Warehouse, Database, Schema, and Role exist for the Snowflake
connection provided to the Operator. The names of these data should replace the
dummy values at the top of the file.
A Snowflake Connection is also needed, named `snowflake_default`.
What makes this a simple data quality case is:
1. Absolute ground truth: the local CSV file is considered perfect and immutable.
2. No transformations or business logic.
3. Exact values of data to quality check are known.
"""
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.empty import EmptyOperator
from airflow.providers.common.sql.operators.sql import (SQLColumnCheckOperator,
SQLTableCheckOperator)
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.utils.dates import datetime
from airflow.utils.task_group import TaskGroup
from astro import sql as aql
from astro.sql.table import Table
SNOWFLAKE_FORESTFIRE_TABLE = "forestfires"
SNOWFLAKE_CONN_ID = "snowflake_default"
def query(sql: str) -> Table:
"""
Query raw SQL and return result as a Table.
Parameter:
sql (str): Query
Return:
sql (Table)
"""
return sql
sql = f"SELECT * FROM {SNOWFLAKE_FORESTFIRE_TABLE}"
def subset_data(
input_table, column_name: str = None, value = None
):
"""
Take dataframe and return subset where column == value.
Parameter:
input_table (DataFrame): Dataframe to filter results on.
column_name (str): Dataframe column name to be filtered.
value (str/int/bool): Data value dataframe subset should
equal. Datatype dependent on column.
Return:
sub_df (DataFrame)
"""
if column_name is None:
sub_df = input_table
elif column_name in input_table.columns:
sub_df = input_table[input_table[column_name] == value]
else:
raise KeyError
return sub_df
with DAG(
"simple_snowflake",
description="Example DAG showcasing loading and data quality checking with Snowflake.",
doc_md=__doc__,
start_date=datetime(2021, 1, 1),
schedule_interval=None,
template_searchpath="/usr/local/airflow/include/sql/snowflake_examples/",
catchup=False,
) as dag:
"""
#### Snowflake table creation
Create the table to store sample forest fire data.
"""
create_table = SnowflakeOperator(
task_id="create_table",
sql="{% include 'create_forestfire_table.sql' %}",
params={"table_name": SNOWFLAKE_FORESTFIRE_TABLE},
)
"""
#### Insert data
Insert data into the Snowflake table using an existing SQL query (stored in
the include/sql/snowflake_examples/ directory).
"""
load_data = SnowflakeOperator(
task_id="insert_query",
sql="{% include 'load_snowflake_forestfire_data.sql' %}",
params={"table_name": SNOWFLAKE_FORESTFIRE_TABLE},
)
get_data = aql.transform(
conn_id=SNOWFLAKE_CONN_ID,
dag=dag,
task_id="query_data",
)(query)(sql=sql)
df_task_id = f"filter_to_forestfire_df"
convert_to_df = aql.dataframe(
conn_id=SNOWFLAKE_CONN_ID,
dag=dag,
task_id=df_task_id,
)(subset_data)(
input_table=get_data,
column_name=None,
value=None,
)
"""
#### Delete table
Clean up the table created for the example.
"""
delete_table = SnowflakeOperator(
task_id="delete_table",
sql="{% include 'delete_snowflake_table.sql' %}",
params={"table_name": SNOWFLAKE_FORESTFIRE_TABLE},
)
begin = EmptyOperator(task_id="begin")
end = EmptyOperator(task_id="end")
chain(begin, create_table, load_data, get_data, convert_to_df, delete_table, end)
Env variable
ENV=sandbox
OPENLINEAGE_URL=http://host.docker.internal:5000/
OPENLINEAGE_NAMESPACE=local
#OPENLINEAGE_API_KEY=blank
the sql and data files you need to run the above are in the include/ directory here
More details on: https://astronomer.slack.com/archives/C03868KGF2Q/p1680227573901929
Expected behavior
- Open lineage should run properly.