astro-sdk
astro-sdk copied to clipboard
PoC: Expose aql.transform as traditional Airflow operator
Context
As of 1.5.3, some existing Python SDK features are exposed as traditional Airflow operators (LoadFileOperator and ExportFileOperator). So far, we have yet to encourage them to be used this way, but we're changing this as part of https://github.com/astronomer/astro-sdk/issues/1853.
Some of the existing Astro SDK features may need more work to be exposed as traditional Airflow Operators. The goal of this ticket is to do a PoC to have not only the decorator aql.trasform, but also a DataframeOperator.
Some challenges may include the following:
- kwargs
- current magic on converting between dataframe/SQL table
- Xcom handling
This ticket is a follow-up to a conversation with @manmeetkaur .
Proposal
from datetime import datetime
from airflow import DAG
from astro import sql as aql
imdb_file = File("https://raw.githubusercontent.com/astronomer/astro-sdk/main/tests/data/imdb_v2.csv")
imdb_table = Table(conn_id="sqlite_default")
popular_animations_table = Table(name="top_five_animations")
with DAG(
"calculate_popular_movies",
schedule_interval=None,
start_date=datetime(2000, 1, 1),
catchup=False,
) as dag:
load_file = LoadFileOperator(
input_file=imdb_file,
output_table=imdb_table
)
transform_table = TransformOperator(
sql = """
SELECT title, rating
FROM {{input_table}}
WHERE genre1='Animation'
ORDER BY Rating desc
LIMIT 5;
""",
input_table = imdb_table,
output_table = popular_animations_table
)
load_file >> transform_table
Acceptance criteria
- [ ] Check the feasibility
- [ ] If straightforward:
- [ ] expose this feature in the SDK
- [ ] document it
- [ ] create an example DAG
- [ ] create follow up for the same change to be applied to other existing decorators
@tatiana I can take up this ticket if it is okay with you.
That's amazing, @manmeetkaur . Thank you very much. Please, go ahead and reach out if you have any questions!
@tatiana just one correction, as we discussed in our meeting, the "decorator" category operators including @dataframe, @transform, @run_raw_sql have special logic included to make them work. That could be de-prioritized and we can focus on the low-hanging fruit of the these operators load_file, append , drop_table , export_to_file , merge , transform_file , check_column , check_table , get_file_list. Thoughts?
Tested Code:
from typing import Callable
import os
from datetime import datetime
from airflow import DAG
from astro import sql as aql
from astro.files import File
from astro.constants import FileType
from astro.sql.table import Metadata, Table
from astro.sql import LoadFileOperator, AppendOperator, DropTableOperator, ExportToFileOperator, MergeOperator, TransformOperator, ColumnCheckOperator, SQLCheckOperator
imdb_file = File("https://raw.githubusercontent.com/astronomer/astro-sdk/main/tests/data/imdb_v2.csv")
imdb_table = Table(name="imdb_data", conn_id="snowflake")
# popular_animations_table = Table(name="top_five_animations")
target_table = Table(name="target_imdb_data", conn_id="snowflake")
final_table = Table(name="final_imdb_data", conn_id="snowflake")
out_file = File(path=os.path.join(os.getcwd(), 'out.csv'))
with DAG(
"calculate_popular_movies",
schedule_interval=None,
start_date=datetime(2000, 1, 1),
catchup=False,
) as dag:
load_file = LoadFileOperator(
input_file=imdb_file,
output_table=imdb_table
)
append = AppendOperator(
target_table = target_table,
source_table = imdb_table,
# columns = ["X", "TITLE", "RATING", "TOTALVOTES", "GENRE1", "GENRE2", "GENRE3", "METACRITIC", "BUDGET", "RUNTIME", "CVOTES10", "CVOTES09", "CVOTES08", "CVOTES07", "CVOTES06", "CVOTES05", "CVOTES04", "CVOTES03", "CVOTES02", "CVOTES01", "CVOTESMALE", "CVOTESFEMALE", "CVOTESU18", "CVOTESU18M", "CVOTESU18F", "CVOTES1829", "CVOTES1829M", "CVOTES1829F", "CVOTES3044", "CVOTES3044M", "CVOTES3044F", "CVOTES45A", "CVOTES45AM", "CVOTES45AF", "CVOTES1000", "CVOTESUS", "CVOTESNUS", "VOTESM", "VOTESF", "VOTESU18", "VOTESU18M", "VOTESU18F", "VOTES1829", "VOTES1829M", "VOTES1829F", "VOTES3044", "VOTES3044M", "VOTES3044F", "VOTES45A", "VOTES45AM", "VOTES45AF", "VOTESIMDB", "VOTES1000", "VOTESUS", "VOTESNUS", "DOMESTIC", "FOREIGN", "WORLDWIDE"]
)
## this throws an error AttributeError: 'LoadFileOperator' object has no attribute 'conn_id' because the Table returned by LoadFileOperator has no conn_id attribute -
## shouldn't it auto-use the conn_id of the input Table object?
# drop = DropTableOperator(
# table=load_file
# )
export_to_file = ExportToFileOperator(
task_id = 'export_file',
output_file=out_file,
input_data=target_table,
if_exists="replace"
)
merge = MergeOperator(
target_table = target_table,
source_table = imdb_table,
target_conflict_columns=["X"],
if_conflicts="ignore",
columns = ["X", "TITLE", "RATING", "TOTALVOTES", "GENRE1", "GENRE2", "GENRE3", "METACRITIC", "BUDGET", "RUNTIME", "CVOTES10", "CVOTES09", "CVOTES08", "CVOTES07", "CVOTES06", "CVOTES05", "CVOTES04", "CVOTES03", "CVOTES02", "CVOTES01", "CVOTESMALE", "CVOTESFEMALE", "CVOTESU18", "CVOTESU18M", "CVOTESU18F", "CVOTES1829", "CVOTES1829M", "CVOTES1829F", "CVOTES3044", "CVOTES3044M", "CVOTES3044F", "CVOTES45A", "CVOTES45AM", "CVOTES45AF", "CVOTES1000", "CVOTESUS", "CVOTESNUS", "VOTESM", "VOTESF", "VOTESU18", "VOTESU18M", "VOTESU18F", "VOTES1829", "VOTES1829M", "VOTES1829F", "VOTES3044", "VOTES3044M", "VOTES3044F", "VOTES45A", "VOTES45AM", "VOTES45AF", "VOTESIMDB", "VOTES1000", "VOTESUS", "VOTESNUS", "DOMESTIC", "FOREIGN", "WORLDWIDE"]
)
transform = TransformOperator(
sql='my_sql.sql',
python_callable=lambda: ('my_sql.sql', {"input_table": target_table}),
parameters={"input_table": target_table},
op_kwargs={"output_table": final_table}
)
drop = DropTableOperator(
table=imdb_table
)
column_check = ColumnCheckOperator(
dataset=target_table,
column_mapping={
"x": {"null_check": {"geq_to": 0, "leq_to": 1}},
"title": {
"null_check": {
"equal_to": 0,
},
},
}
)
table_check = SQLCheckOperator(
dataset=final_table,
checks={
"row_count": {"check_statement": "count(*) > 10"}
}
)
load_file >> append >> export_to_file >> merge >> drop >> transform >> column_check >> table_check
aql.cleanup()