astro-sdk icon indicating copy to clipboard operation
astro-sdk copied to clipboard

Open lineage not working on a DAG

Open sunank200 opened this issue 2 years ago • 0 comments

Describe the bug The following example DAG is not able to populate the lineage:

"""
### Simple EL Pipeline with Data Quality Checks Using Snowflake

Runs a data quality check, in SQL, on the forest fires dataset

Note that this DAG will clean up after itself and delete all data it uploads.

Ensure a Snowflake Warehouse, Database, Schema, and Role exist for the Snowflake
connection provided to the Operator. The names of these data should replace the
dummy values at the top of the file.

A Snowflake Connection is also needed, named `snowflake_default`.

What makes this a simple data quality case is:
1. Absolute ground truth: the local CSV file is considered perfect and immutable.
2. No transformations or business logic.
3. Exact values of data to quality check are known.
"""

from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.empty import EmptyOperator
from airflow.providers.common.sql.operators.sql import (SQLColumnCheckOperator,
                                                        SQLTableCheckOperator)
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.utils.dates import datetime
from airflow.utils.task_group import TaskGroup

from astro import sql as aql
from astro.sql.table import Table


SNOWFLAKE_FORESTFIRE_TABLE = "forestfires"
SNOWFLAKE_CONN_ID = "snowflake_default"

def query(sql: str) -> Table:
    """
    Query raw SQL and return result as a Table.
        Parameter:
            sql (str): Query
        Return:
            sql (Table)
    """
    return sql

sql = f"SELECT * FROM {SNOWFLAKE_FORESTFIRE_TABLE}"

def subset_data(
    input_table, column_name: str = None, value = None
):
    """
    Take dataframe and return subset where column == value.
        Parameter:
            input_table (DataFrame): Dataframe to filter results on.
            column_name (str): Dataframe column name to be filtered.
            value (str/int/bool): Data value dataframe subset should
                                  equal. Datatype dependent on column.
        Return:
            sub_df (DataFrame)
    """
    if column_name is None:
        sub_df = input_table
    elif column_name in input_table.columns:
        sub_df = input_table[input_table[column_name] == value]
    else:
        raise KeyError

    return sub_df

with DAG(
    "simple_snowflake",
    description="Example DAG showcasing loading and data quality checking with Snowflake.",
    doc_md=__doc__,
    start_date=datetime(2021, 1, 1),
    schedule_interval=None,
    template_searchpath="/usr/local/airflow/include/sql/snowflake_examples/",
    catchup=False,
) as dag:

    """
    #### Snowflake table creation
    Create the table to store sample forest fire data.
    """
    create_table = SnowflakeOperator(
        task_id="create_table",
        sql="{% include 'create_forestfire_table.sql' %}",
        params={"table_name": SNOWFLAKE_FORESTFIRE_TABLE},
    )

    """
    #### Insert data
    Insert data into the Snowflake table using an existing SQL query (stored in
    the include/sql/snowflake_examples/ directory).
    """
    load_data = SnowflakeOperator(
        task_id="insert_query",
        sql="{% include 'load_snowflake_forestfire_data.sql' %}",
        params={"table_name": SNOWFLAKE_FORESTFIRE_TABLE},
    )

    get_data = aql.transform(
        conn_id=SNOWFLAKE_CONN_ID,
        dag=dag,
        task_id="query_data",
    )(query)(sql=sql)

    df_task_id = f"filter_to_forestfire_df"

    convert_to_df = aql.dataframe(
        conn_id=SNOWFLAKE_CONN_ID,
        dag=dag,
        task_id=df_task_id,
    )(subset_data)(
        input_table=get_data,
        column_name=None,
        value=None,
    )

    """
    #### Delete table
    Clean up the table created for the example.
    """
    delete_table = SnowflakeOperator(
        task_id="delete_table",
        sql="{% include 'delete_snowflake_table.sql' %}",
        params={"table_name": SNOWFLAKE_FORESTFIRE_TABLE},
    )

    begin = EmptyOperator(task_id="begin")
    end = EmptyOperator(task_id="end")

    chain(begin, create_table, load_data, get_data, convert_to_df, delete_table, end)

Env variable

ENV=sandbox
OPENLINEAGE_URL=http://host.docker.internal:5000/
OPENLINEAGE_NAMESPACE=local
#OPENLINEAGE_API_KEY=blank

the sql and data files you need to run the above are in the include/ directory here

More details on: https://astronomer.slack.com/archives/C03868KGF2Q/p1680227573901929

Expected behavior

  • Open lineage should run properly.

sunank200 avatar Apr 03 '23 06:04 sunank200