astro-sdk icon indicating copy to clipboard operation
astro-sdk copied to clipboard

PoC: Expose aql.transform as traditional Airflow operator

Open tatiana opened this issue 1 year ago • 4 comments

Context

As of 1.5.3, some existing Python SDK features are exposed as traditional Airflow operators (LoadFileOperator and ExportFileOperator). So far, we have yet to encourage them to be used this way, but we're changing this as part of https://github.com/astronomer/astro-sdk/issues/1853.

Some of the existing Astro SDK features may need more work to be exposed as traditional Airflow Operators. The goal of this ticket is to do a PoC to have not only the decorator aql.trasform, but also a DataframeOperator.

Some challenges may include the following:

  • kwargs
  • current magic on converting between dataframe/SQL table
  • Xcom handling

This ticket is a follow-up to a conversation with @manmeetkaur .

Proposal

from datetime import datetime
from airflow import DAG
from astro import sql as aql


imdb_file = File("https://raw.githubusercontent.com/astronomer/astro-sdk/main/tests/data/imdb_v2.csv")
imdb_table = Table(conn_id="sqlite_default")
popular_animations_table = Table(name="top_five_animations")


with DAG(
    "calculate_popular_movies",
    schedule_interval=None,
    start_date=datetime(2000, 1, 1),
    catchup=False,
) as dag:

    load_file = LoadFileOperator(
        input_file=imdb_file,
        output_table=imdb_table
    )

    transform_table = TransformOperator(
        sql = """
            SELECT title, rating
            FROM {{input_table}}
            WHERE genre1='Animation'
            ORDER BY Rating desc
            LIMIT 5;
        """,
        input_table = imdb_table,
        output_table = popular_animations_table
    )

    load_file >> transform_table    

Acceptance criteria

  • [ ] Check the feasibility
  • [ ] If straightforward:
    • [ ] expose this feature in the SDK
    • [ ] document it
    • [ ] create an example DAG
    • [ ] create follow up for the same change to be applied to other existing decorators

tatiana avatar Mar 17 '23 10:03 tatiana

@tatiana I can take up this ticket if it is okay with you.

manmeetkaur avatar Mar 21 '23 05:03 manmeetkaur

That's amazing, @manmeetkaur . Thank you very much. Please, go ahead and reach out if you have any questions!

tatiana avatar Mar 21 '23 07:03 tatiana

@tatiana just one correction, as we discussed in our meeting, the "decorator" category operators including @dataframe, @transform, @run_raw_sql have special logic included to make them work. That could be de-prioritized and we can focus on the low-hanging fruit of the these operators load_file, append , drop_table , export_to_file , merge , transform_file , check_column , check_table , get_file_list. Thoughts?

manmeetkaur avatar Mar 21 '23 12:03 manmeetkaur

Tested Code:

from typing import Callable
import os
from datetime import datetime
from airflow import DAG
from astro import sql as aql
from astro.files import File
from astro.constants import FileType
from astro.sql.table import Metadata, Table
from astro.sql import LoadFileOperator, AppendOperator, DropTableOperator, ExportToFileOperator, MergeOperator, TransformOperator, ColumnCheckOperator, SQLCheckOperator

imdb_file = File("https://raw.githubusercontent.com/astronomer/astro-sdk/main/tests/data/imdb_v2.csv")
imdb_table = Table(name="imdb_data", conn_id="snowflake")
# popular_animations_table = Table(name="top_five_animations")
target_table = Table(name="target_imdb_data", conn_id="snowflake")
final_table = Table(name="final_imdb_data", conn_id="snowflake")
out_file = File(path=os.path.join(os.getcwd(), 'out.csv'))


with DAG(
    "calculate_popular_movies",
    schedule_interval=None,
    start_date=datetime(2000, 1, 1),
    catchup=False,
) as dag:

    load_file = LoadFileOperator(
        input_file=imdb_file,
        output_table=imdb_table
    )

    append = AppendOperator(
        target_table = target_table,
        source_table = imdb_table,
        # columns = ["X", "TITLE", "RATING", "TOTALVOTES", "GENRE1", "GENRE2", "GENRE3", "METACRITIC", "BUDGET", "RUNTIME", "CVOTES10", "CVOTES09", "CVOTES08", "CVOTES07", "CVOTES06", "CVOTES05", "CVOTES04", "CVOTES03", "CVOTES02", "CVOTES01", "CVOTESMALE", "CVOTESFEMALE", "CVOTESU18", "CVOTESU18M", "CVOTESU18F", "CVOTES1829", "CVOTES1829M", "CVOTES1829F", "CVOTES3044", "CVOTES3044M", "CVOTES3044F", "CVOTES45A", "CVOTES45AM", "CVOTES45AF", "CVOTES1000", "CVOTESUS", "CVOTESNUS", "VOTESM", "VOTESF", "VOTESU18", "VOTESU18M", "VOTESU18F", "VOTES1829", "VOTES1829M", "VOTES1829F", "VOTES3044", "VOTES3044M", "VOTES3044F", "VOTES45A", "VOTES45AM", "VOTES45AF", "VOTESIMDB", "VOTES1000", "VOTESUS", "VOTESNUS", "DOMESTIC", "FOREIGN", "WORLDWIDE"]
    )


    ## this throws an error AttributeError: 'LoadFileOperator' object has no attribute 'conn_id' because the Table returned by LoadFileOperator has no conn_id attribute - 
    ## shouldn't it auto-use the conn_id of the input Table object?
    # drop = DropTableOperator(
    #     table=load_file
    # )

    export_to_file = ExportToFileOperator(
        task_id = 'export_file',
        output_file=out_file,
        input_data=target_table,
        if_exists="replace"
    )

    merge = MergeOperator(
        target_table = target_table,
        source_table = imdb_table,
        target_conflict_columns=["X"],
        if_conflicts="ignore",
        columns = ["X", "TITLE", "RATING", "TOTALVOTES", "GENRE1", "GENRE2", "GENRE3", "METACRITIC", "BUDGET", "RUNTIME", "CVOTES10", "CVOTES09", "CVOTES08", "CVOTES07", "CVOTES06", "CVOTES05", "CVOTES04", "CVOTES03", "CVOTES02", "CVOTES01", "CVOTESMALE", "CVOTESFEMALE", "CVOTESU18", "CVOTESU18M", "CVOTESU18F", "CVOTES1829", "CVOTES1829M", "CVOTES1829F", "CVOTES3044", "CVOTES3044M", "CVOTES3044F", "CVOTES45A", "CVOTES45AM", "CVOTES45AF", "CVOTES1000", "CVOTESUS", "CVOTESNUS", "VOTESM", "VOTESF", "VOTESU18", "VOTESU18M", "VOTESU18F", "VOTES1829", "VOTES1829M", "VOTES1829F", "VOTES3044", "VOTES3044M", "VOTES3044F", "VOTES45A", "VOTES45AM", "VOTES45AF", "VOTESIMDB", "VOTES1000", "VOTESUS", "VOTESNUS", "DOMESTIC", "FOREIGN", "WORLDWIDE"]
    )

    transform = TransformOperator(
        sql='my_sql.sql',
        python_callable=lambda: ('my_sql.sql', {"input_table": target_table}),
        parameters={"input_table": target_table},
        op_kwargs={"output_table": final_table}
    )

    drop = DropTableOperator(
        table=imdb_table
    )

    column_check = ColumnCheckOperator(
        dataset=target_table,
        column_mapping={
            "x": {"null_check": {"geq_to": 0, "leq_to": 1}},
            "title": {
                "null_check": {
                    "equal_to": 0,
                },
            },
        }
    )

    table_check = SQLCheckOperator(
        dataset=final_table,
        checks={
            "row_count": {"check_statement": "count(*) > 10"}
        }
    )

    load_file >> append >> export_to_file >> merge >> drop >> transform >> column_check >> table_check

    aql.cleanup()

manmeetkaur avatar Apr 05 '23 10:04 manmeetkaur