astro-sdk
astro-sdk copied to clipboard
Create a custom Astro SDK DAG class which magically cleans up
Context
Before implementing the cleanup task, there were several discussions about how we could delete temporary tables:
https://github.com/astronomer/astro-sdk/blob/main/aep/AEP-2-table-cleanup.md
By then, we opted to have an explicit cleanup task, as explained in this Architecture Enhancement Proposal.
Some users, including @manmeetkaur, have raised questions about possibly having a non-explicit way of cleaning up temporary tables.
Proposal
Offer users, in addition to the existing cleanup operator, a custom DAG class.
Similar to how the Marquez project handled Airflow pre 2.0, we can offer our custom DAG object. This would allow us to supply our custom callbacks transparent to the user
from astro.dag import DAG
dag = DAG(
dag_id="example_amazon_s3_postgres",
start_date=datetime(2019, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=30),
default_args=default_args,
)
with dag:
my_homes_table = aql.load_file(
input_file=File(path=f"{s3_bucket}/homes.csv"),
output_table=Table(
conn_id="postgres_conn",
),
)
sample_table = sample_create_table(my_homes_table)
my_df_func(sample_table)
Concerns
- This feature may be hard to maintain, assuming the upstream Airflow DAG class changes significantly.
- Some users may feel uneasy about not using the standard DAG class (those can remain to use the cleanup task)